You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/03 21:33:10 UTC

[1/5] flink git commit: [FLINK-1656] Filter ForwardedField properties for group-at-a-time operators in Optimizer.

Repository: flink
Updated Branches:
  refs/heads/master b8bb762e3 -> f36eb54ee


[FLINK-1656] Filter ForwardedField properties for group-at-a-time operators in Optimizer.

This closes #525


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dda8565e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dda8565e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dda8565e

Branch: refs/heads/master
Commit: dda8565e6b4f519ac77d66d710a5dc64f1c5740b
Parents: b8bb762
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Mar 23 11:55:34 2015 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 3 20:39:47 2015 +0200

----------------------------------------------------------------------
 docs/programming_guide.md                       |  8 +-
 .../api/java/operators/GroupReduceOperator.java |  2 +-
 .../apache/flink/optimizer/dag/CoGroupNode.java | 39 ++++++++
 .../flink/optimizer/dag/GroupCombineNode.java   | 28 ++++++
 .../flink/optimizer/dag/GroupReduceNode.java    | 28 ++++++
 .../flink/optimizer/dag/MapPartitionNode.java   | 18 ++++
 .../flink/optimizer/dag/SingleInputNode.java    | 16 +++-
 .../flink/optimizer/dag/TwoInputNode.java       | 23 ++++-
 .../flink/optimizer/dag/CoGroupNodeTest.java    | 96 ++++++++++++++++++++
 .../optimizer/dag/GroupCombineNodeTest.java     | 72 +++++++++++++++
 .../optimizer/dag/GroupReduceNodeTest.java      | 71 +++++++++++++++
 .../optimizer/dag/MapPartitionNodeTest.java     | 61 +++++++++++++
 12 files changed, 448 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 4d338d7..d69a303 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -2303,7 +2303,7 @@ env.execute()
 Semantic Annotations
 -----------
 
-Semantic annotations can be used give Flink hints about the behavior of a function. 
+Semantic annotations can be used to give Flink hints about the behavior of a function. 
 They tell the system which fields of a function's input the function reads and evaluates and
 which fields it unmodified forwards from its input to its output. 
 Semantic annotations are a powerful means to speed up execution, because they
@@ -2325,11 +2325,12 @@ The following semantic annotations are currently supported.
 Forwarded fields information declares input fields which are unmodified forwarded by a function to the same position or to another position in the output. 
 This information is used by the optimizer to infer whether a data property such as sorting or 
 partitioning is preserved by a function.
+For functions that operate on groups of input elements such as `GroupReduce`, `GroupCombine`, `CoGroup`, and `MapPartition`, all fields that are defined as forwarded fields must always be jointly forwarded from the same input element. The forwarded fields of each element that is emitted by a group-wise function may originate from a different element of the function's input group.
 
 Field forward information is specified using [field expressions](#define-keys-using-field-expressions).
 Fields that are forwarded to the same position in the output can be specified by their position. 
 The specified position must be valid for the input and output data type and have the same type.
-For example the String `"f2"` declares that the third field of a Java input tuple is always equal to the third field in the output tuple.
+For example the String `"f2"` declares that the third field of a Java input tuple is always equal to the third field in the output tuple. 
 
 Fields which are unmodified forwarded to another position in the output are declared by specifying the
 source field in the input and the target field in the output as field expressions.
@@ -2389,7 +2390,8 @@ class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
 
 Non-forwarded fields information declares all fields which are not preserved on the same position in a function's output. 
 The values of all other fields are considered to be preserved at the same position in the output. 
-Hence, non-forwarded fields information is inverse to forwarded fields information.
+Hence, non-forwarded fields information is inverse to forwarded fields information. 
+Non-forwarded field information for group-wise operators such as `GroupReduce`, `GroupCombine`, `CoGroup`, and `MapPartition` must fulfill the same requirements as for forwarded field information.
 
 **IMPORTANT**: The specification of non-forwarded fields information is optional. However if used, 
 **ALL!** non-forwarded fields must be specified, because all other fields are considered to be forwarded in place. It is safe to declare a forwarded field as non-forwarded.

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 30f2cc4..c96b7c6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -119,7 +119,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		
 		return this;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
index 92076c3..20bad0d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -22,13 +22,17 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.CoGroupDescriptor;
 import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetFirstDescriptor;
 import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetSecondDescriptor;
 import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.api.common.operators.util.FieldSet;
 
 /**
  * The Optimizer representation of a <i>CoGroup</i> operator.
@@ -77,6 +81,41 @@ public class CoGroupNode extends TwoInputNode {
 	}
 
 	@Override
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+
+		// Local properties for CoGroup may only be preserved on key fields.
+		DualInputSemanticProperties origProps = ((DualInputOperator<?, ?, ?, ?>) getOperator()).getSemanticProperties();
+
+		DualInputSemanticProperties filteredProps = new DualInputSemanticProperties();
+		FieldSet readSet1 = origProps.getReadFields(0);
+		FieldSet readSet2 = origProps.getReadFields(1);
+		if(readSet1 != null) {
+			filteredProps.addReadFields(0, readSet1);
+		}
+		if(readSet2 != null) {
+			filteredProps.addReadFields(1, readSet2);
+		}
+
+		// preserve only key fields (first input)
+		for(int f : this.keys1) {
+			FieldSet targets = origProps.getForwardingTargetFields(0, f);
+			for(int t : targets) {
+				filteredProps.addForwardedField(0, f, t);
+			}
+		}
+
+		// preserve only key fields (second input)
+		for(int f : this.keys2) {
+			FieldSet targets = origProps.getForwardingTargetFields(1, f);
+			for(int t : targets) {
+				filteredProps.addForwardedField(1, f, t);
+			}
+		}
+
+		return filteredProps;
+	}
+
+	@Override
 	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
 		// for CoGroup, we currently make no reasonable default estimates
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
index d25fed9..766d6af 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -19,7 +19,11 @@
 package org.apache.flink.optimizer.dag;
 
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.AllGroupCombineProperties;
 import org.apache.flink.optimizer.operators.GroupCombineProperties;
@@ -88,6 +92,30 @@ public class GroupCombineNode extends SingleInputNode {
 		return this.possibleProperties;
 	}
 
+	@Override
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+
+		// Local properties for GroupCombine may only be preserved on key fields.
+		SingleInputSemanticProperties origProps =
+				((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
+		SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
+		FieldSet readSet = origProps.getReadFields(0);
+		if(readSet != null) {
+			filteredProps.addReadFields(readSet);
+		}
+
+		// only add forward field information for key fields
+		if(this.keys != null) {
+			for (int f : this.keys) {
+				FieldSet targets = origProps.getForwardingTargetFields(0, f);
+				for (int t : targets) {
+					filteredProps.addForwardedField(f, t);
+				}
+			}
+		}
+		return filteredProps;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Estimates
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index 227b75f..51da36b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -23,6 +23,9 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
@@ -32,6 +35,7 @@ import org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupPropertie
 import org.apache.flink.optimizer.operators.GroupReduceProperties;
 import org.apache.flink.optimizer.operators.GroupReduceWithCombineProperties;
 import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.configuration.Configuration;
 
 /**
@@ -135,6 +139,30 @@ public class GroupReduceNode extends SingleInputNode {
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
 		return this.possibleProperties;
 	}
+
+	@Override
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+
+		// Local properties for GroupReduce may only be preserved on key fields.
+		SingleInputSemanticProperties origProps =
+				((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
+		SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
+		FieldSet readSet = origProps.getReadFields(0);
+		if(readSet != null) {
+			filteredProps.addReadFields(readSet);
+		}
+
+		// only add forward field information for key fields
+		if(this.keys != null) {
+			for (int f : this.keys) {
+				FieldSet targets = origProps.getForwardingTargetFields(0, f);
+				for (int t : targets) {
+					filteredProps.addForwardedField(f, t);
+				}
+			}
+		}
+		return filteredProps;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Estimates

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
index b287c33..6914c15 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
@@ -22,10 +22,13 @@ package org.apache.flink.optimizer.dag;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.MapPartitionDescriptor;
 import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
 
 /**
  * The optimizer's internal representation of a <i>MapPartition</i> operator node.
@@ -55,6 +58,21 @@ public class MapPartitionNode extends SingleInputNode {
 		return this.possibleProperties;
 	}
 
+	@Override
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+
+		// Local properties for MapPartition may not be preserved.
+		SingleInputSemanticProperties origProps =
+				((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
+		SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
+		FieldSet readSet = origProps.getReadFields(0);
+		if(readSet != null) {
+			filteredProps.addReadFields(readSet);
+		}
+
+		return filteredProps;
+	}
+
 	/**
 	 * Computes the estimates for the MapPartition operator.
 	 * We assume that by default, Map takes one value and transforms it into another value.

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
index e9b31f4..61bee58 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
@@ -148,7 +148,14 @@ public abstract class SingleInputNode extends OptimizerNode {
 	public SemanticProperties getSemanticProperties() {
 		return getOperator().getSemanticProperties();
 	}
-	
+
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+		return this.getSemanticProperties();
+	}
+
+	protected SemanticProperties getSemanticPropertiesForGlobalPropertyFiltering() {
+		return this.getSemanticProperties();
+	}
 
 	@Override
 	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
@@ -467,10 +474,9 @@ public abstract class SingleInputNode extends OptimizerNode {
 			gProps = dps.computeGlobalProperties(gProps);
 			lProps = dps.computeLocalProperties(lProps);
 
-			SemanticProperties props = this.getSemanticProperties();
 			// filter by the user code field copies
-			gProps = gProps.filterBySemanticProperties(props, 0);
-			lProps = lProps.filterBySemanticProperties(props, 0);
+			gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(), 0);
+			lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(), 0);
 			
 			// apply
 			node.initProperties(gProps, lProps);
@@ -478,7 +484,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 			target.add(node);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//                                     Branch Handling
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
index f3122ba..76b03c1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
@@ -607,13 +607,18 @@ public abstract class TwoInputNode extends OptimizerNode {
 			DualInputPlanNode node = operator.instantiate(in1, in2, this);
 			node.setBroadcastInputs(broadcastChannelsCombination);
 
-			SemanticProperties props = this.getSemanticProperties();
-			GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
-			GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
+			SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
+			GlobalProperties gp1 = in1.getGlobalProperties().clone()
+					.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
+			GlobalProperties gp2 = in2.getGlobalProperties().clone()
+					.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
 			GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);
 
-			LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
-			LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
+			SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
+			LocalProperties lp1 = in1.getLocalProperties().clone()
+					.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
+			LocalProperties lp2 = in2.getLocalProperties().clone()
+					.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
 			LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
 			
 			node.initProperties(combined, locals);
@@ -722,6 +727,14 @@ public abstract class TwoInputNode extends OptimizerNode {
 	public SemanticProperties getSemanticProperties() {
 		return getOperator().getSemanticProperties();
 	}
+
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+		return this.getSemanticProperties();
+	}
+
+	protected SemanticProperties getSemanticPropertiesForGlobalPropertyFiltering() {
+		return this.getSemanticProperties();
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//                                     Miscellaneous

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/CoGroupNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/CoGroupNodeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/CoGroupNodeTest.java
new file mode 100644
index 0000000..96b6d03
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/CoGroupNodeTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CoGroupNodeTest {
+
+	@Test
+	public void testGetSemanticProperties() {
+
+		DualInputSemanticProperties origProps = new DualInputSemanticProperties();
+		// props for first input
+		origProps.addForwardedField(0, 0, 1);
+		origProps.addForwardedField(0, 2, 2);
+		origProps.addForwardedField(0, 3, 4);
+		origProps.addForwardedField(0, 6, 0);
+		origProps.addReadFields(0, new FieldSet(0, 2, 4, 7));
+		// props for second input
+		origProps.addForwardedField(1, 1, 2);
+		origProps.addForwardedField(1, 2, 8);
+		origProps.addForwardedField(1, 3, 7);
+		origProps.addForwardedField(1, 6, 6);
+		origProps.addReadFields(1, new FieldSet(1, 3, 4));
+
+		CoGroupOperatorBase<?,?,?,?> op = mock(CoGroupOperatorBase.class);
+		when(op.getSemanticProperties()).thenReturn(origProps);
+		when(op.getKeyColumns(0)).thenReturn(new int[]{3,2});
+		when(op.getKeyColumns(1)).thenReturn(new int[]{6,3});
+		when(op.getParameters()).thenReturn(new Configuration());
+
+		CoGroupNode node = new CoGroupNode(op);
+
+		SemanticProperties filteredProps = node.getSemanticPropertiesForLocalPropertyFiltering();
+
+		// check first input props
+		assertTrue(filteredProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 1) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 2) == 2);
+		assertTrue(filteredProps.getForwardingSourceField(0, 4) == 3);
+		assertTrue(filteredProps.getForwardingSourceField(0, 0) < 0);
+		// check second input props
+		assertTrue(filteredProps.getReadFields(0).size() == 4);
+		assertTrue(filteredProps.getReadFields(0).contains(0));
+		assertTrue(filteredProps.getReadFields(0).contains(2));
+		assertTrue(filteredProps.getReadFields(0).contains(4));
+		assertTrue(filteredProps.getReadFields(0).contains(7));
+
+		assertTrue(filteredProps.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(1, 2).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(1, 3).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(1, 3).contains(7));
+		assertTrue(filteredProps.getForwardingTargetFields(1, 6).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(1, 6).contains(6));
+		assertTrue(filteredProps.getForwardingSourceField(1, 2) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(1, 8) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(1, 7) == 3);
+		assertTrue(filteredProps.getForwardingSourceField(1, 6) == 6);
+
+		assertTrue(filteredProps.getReadFields(1).size() == 3);
+		assertTrue(filteredProps.getReadFields(1).contains(1));
+		assertTrue(filteredProps.getReadFields(1).contains(3));
+		assertTrue(filteredProps.getReadFields(1).contains(4));
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
new file mode 100644
index 0000000..f4776a0
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GroupCombineNodeTest {
+
+	@Test
+	public void testGetSemanticProperties() {
+
+		SingleInputSemanticProperties origProps = new SingleInputSemanticProperties();
+		origProps.addForwardedField(0, 1);
+		origProps.addForwardedField(2, 2);
+		origProps.addForwardedField(3, 4);
+		origProps.addForwardedField(6, 0);
+		origProps.addReadFields(new FieldSet(0, 2, 4, 7));
+
+		GroupCombineOperatorBase<?,?,?> op = mock(GroupCombineOperatorBase.class);
+		when(op.getSemanticProperties()).thenReturn(origProps);
+		when(op.getKeyColumns(0)).thenReturn(new int[]{3,2});
+		when(op.getParameters()).thenReturn(new Configuration());
+
+		GroupCombineNode node = new GroupCombineNode(op);
+
+		SemanticProperties filteredProps = node.getSemanticPropertiesForLocalPropertyFiltering();
+
+		assertTrue(filteredProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 1) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 2) == 2);
+		assertTrue(filteredProps.getForwardingSourceField(0, 4) == 3);
+		assertTrue(filteredProps.getForwardingSourceField(0, 0) < 0);
+
+		assertTrue(filteredProps.getReadFields(0).size() == 4);
+		assertTrue(filteredProps.getReadFields(0).contains(0));
+		assertTrue(filteredProps.getReadFields(0).contains(2));
+		assertTrue(filteredProps.getReadFields(0).contains(4));
+		assertTrue(filteredProps.getReadFields(0).contains(7));
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupReduceNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupReduceNodeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupReduceNodeTest.java
new file mode 100644
index 0000000..da8a0b4
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupReduceNodeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GroupReduceNodeTest {
+
+	@Test
+	public void testGetSemanticProperties() {
+
+		SingleInputSemanticProperties origProps = new SingleInputSemanticProperties();
+		origProps.addForwardedField(0, 1);
+		origProps.addForwardedField(2, 2);
+		origProps.addForwardedField(3, 4);
+		origProps.addForwardedField(6, 0);
+		origProps.addReadFields(new FieldSet(0, 2, 4, 7));
+
+		GroupReduceOperatorBase<?,?,?> op = mock(GroupReduceOperatorBase.class);
+		when(op.getSemanticProperties()).thenReturn(origProps);
+		when(op.getKeyColumns(0)).thenReturn(new int[]{3,2});
+		when(op.getParameters()).thenReturn(new Configuration());
+
+		GroupReduceNode node = new GroupReduceNode(op);
+
+		SemanticProperties filteredProps = node.getSemanticPropertiesForLocalPropertyFiltering();
+
+		assertTrue(filteredProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 1) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 2) == 2);
+		assertTrue(filteredProps.getForwardingSourceField(0, 4) == 3);
+		assertTrue(filteredProps.getForwardingSourceField(0, 0) < 0);
+
+		assertTrue(filteredProps.getReadFields(0).size() == 4);
+		assertTrue(filteredProps.getReadFields(0).contains(0));
+		assertTrue(filteredProps.getReadFields(0).contains(2));
+		assertTrue(filteredProps.getReadFields(0).contains(4));
+		assertTrue(filteredProps.getReadFields(0).contains(7));
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/MapPartitionNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/MapPartitionNodeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/MapPartitionNodeTest.java
new file mode 100644
index 0000000..c9c6b50
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/MapPartitionNodeTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MapPartitionNodeTest {
+
+	@Test
+	public void testGetSemanticProperties() {
+
+		SingleInputSemanticProperties origProps = new SingleInputSemanticProperties();
+		origProps.addForwardedField(0, 1);
+		origProps.addForwardedField(2, 2);
+		origProps.addReadFields(new FieldSet(0, 2, 4, 7));
+
+		MapPartitionOperatorBase<?,?,?> op = mock(MapPartitionOperatorBase.class);
+		when(op.getSemanticProperties()).thenReturn(origProps);
+		when(op.getKeyColumns(0)).thenReturn(new int[]{});
+
+		MapPartitionNode node = new MapPartitionNode(op);
+
+		SemanticProperties filteredProps = node.getSemanticPropertiesForLocalPropertyFiltering();
+
+		assertTrue(filteredProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 1) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 2) < 0);
+		assertTrue(filteredProps.getReadFields(0).size() == 4);
+		assertTrue(filteredProps.getReadFields(0).contains(0));
+		assertTrue(filteredProps.getReadFields(0).contains(2));
+		assertTrue(filteredProps.getReadFields(0).contains(4));
+		assertTrue(filteredProps.getReadFields(0).contains(7));
+
+	}
+
+}


[5/5] flink git commit: [FLINK-1664] Adds check if a selected sort key is sortable

Posted by fh...@apache.org.
[FLINK-1664] Adds check if a selected sort key is sortable

This closes #541


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f36eb54e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f36eb54e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f36eb54e

Branch: refs/heads/master
Commit: f36eb54ee6d8cc130439def98559b6b0a70b6c7b
Parents: f39aec8
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Mar 27 21:37:59 2015 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 3 20:42:05 2015 +0200

----------------------------------------------------------------------
 .../api/common/typeinfo/TypeInformation.java    |  10 +-
 .../api/common/typeutils/CompositeType.java     |  20 ++
 .../flink/api/java/SortPartitionOperator.java   |  30 +-
 .../flink/api/java/operators/DataSink.java      |  28 ++
 .../apache/flink/api/java/operators/Keys.java   |  14 +-
 .../api/java/operators/SortedGrouping.java      |  71 +++--
 .../flink/api/java/typeutils/PojoTypeInfo.java  |   7 +-
 .../api/java/typeutils/TupleTypeInfoBase.java   |  19 --
 .../flink/api/java/operator/DataSinkTest.java   |  46 +--
 .../flink/api/java/operator/GroupingTest.java   | 278 +++++++++++++++++--
 .../api/java/operator/SortPartitionTest.java    | 204 ++++++++++++++
 11 files changed, 637 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 4fa02e3..bb50e32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -132,7 +132,15 @@ public abstract class TypeInformation<T> implements Serializable {
 	 * @return True, if the type can be used as a key, false otherwise.
 	 */
 	public abstract boolean isKeyType();
-	
+
+	/**
+	 * Checks whether this type can be used as a key for sorting.
+	 * The order produced by sorting this type must be meaningful.
+	 */
+	public boolean isSortKeyType() {
+		return isKeyType();
+	}
+
 	/**
 	 * Creates a serializer for the type. The serializer may use the ExecutionConfig
 	 * for parameterization.

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 54a1e13..de39ec8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -169,6 +169,26 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 		return getFieldIndex(fieldName) >= 0;
 	}
 
+	@Override
+	public boolean isKeyType() {
+		for(int i=0;i<this.getArity();i++) {
+			if (!this.getTypeAt(i).isKeyType()) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public boolean isSortKeyType() {
+		for(int i=0;i<this.getArity();i++) {
+			if (!this.getTypeAt(i).isSortKeyType()) {
+				return false;
+			}
+		}
+		return true;
+	}
+
 	/**
 	 * Returns the names of the composite fields of this type. The order of the returned array must
 	 * be consistent with the internal field index ordering.

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
index c8f8bbc..988144b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
@@ -24,9 +24,11 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.SingleInputOperator;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 
 import java.util.Arrays;
 
@@ -96,11 +98,23 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart
 
 	private int[] getFlatFields(int field) {
 
+		if(!(super.getType() instanceof TupleTypeInfoBase<?>)) {
+			throw new InvalidProgramException("Field positions can only be specified on Tuple or " +
+					"Case Class types.");
+		}
+		else {
+			// check selected field is sortable type
+			TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) super.getType()).getTypeAt(field);
+			if (!sortKeyType.isSortKeyType()) {
+				throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+			}
+		}
+
 		Keys.ExpressionKeys<T> ek;
 		try {
 			ek = new Keys.ExpressionKeys<T>(new int[]{field}, super.getType());
 		} catch(IllegalArgumentException iae) {
-			throw new InvalidProgramException("Invalid specification of field expression.", iae);
+			throw new InvalidProgramException("Invalid specification of field position.", iae);
 		}
 		return ek.computeLogicalKeyPositions();
 	}
@@ -108,6 +122,13 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart
 	private int[] getFlatFields(String fields) {
 
 		if(super.getType() instanceof CompositeType) {
+
+			// check selected field is sortable type
+			TypeInformation<?> sortKeyType = ((CompositeType<?>) super.getType()).getTypeAt(fields);
+			if (!sortKeyType.isSortKeyType()) {
+				throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+			}
+
 			// compute flat field positions for (nested) sorting fields
 			Keys.ExpressionKeys<T> ek;
 			try {
@@ -123,6 +144,12 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart
 				throw new InvalidProgramException("Output sorting of non-composite types can only be defined on the full type. " +
 						"Use a field wildcard for that (\"*\" or \"_\")");
 			} else {
+
+				// check if selected field is sortable type
+				if (!super.getType().isSortKeyType()) {
+					throw new InvalidProgramException("Selected sort key cannot be sorted: " + super.getType());
+				}
+
 				return new int[]{0};
 			}
 		}
@@ -149,7 +176,6 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart
 		}
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 83ec021..5b5b031 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Nothing;
 import org.apache.flink.api.java.DataSet;
@@ -114,6 +115,7 @@ public class DataSink<T> {
 		if (field >= this.type.getArity()) {
 			throw new InvalidProgramException("Order key out of tuple bounds.");
 		}
+		isValidSortKeyType(field);
 
 		// get flat keys
 		Keys.ExpressionKeys<T> ek;
@@ -166,9 +168,11 @@ public class DataSink<T> {
 		Order[] orders;
 
 		if(this.type instanceof CompositeType) {
+
 			// compute flat field positions for (nested) sorting fields
 			Keys.ExpressionKeys<T> ek;
 			try {
+				isValidSortKeyType(fieldExpression);
 				ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type);
 			} catch(IllegalArgumentException iae) {
 				throw new InvalidProgramException("Invalid specification of field expression.", iae);
@@ -183,6 +187,8 @@ public class DataSink<T> {
 				throw new InvalidProgramException("Output sorting of non-composite types can only be defined on the full type. " +
 						"Use a field wildcard for that (\"*\" or \"_\")");
 			} else {
+				isValidSortKeyType(fieldExpression);
+
 				numFields = 1;
 				fields = new int[]{0};
 				orders = new Order[]{order};
@@ -208,6 +214,28 @@ public class DataSink<T> {
 		return this;
 	}
 
+	private void isValidSortKeyType(int field) {
+		TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) this.type).getTypeAt(field);
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+		}
+	}
+
+	private void isValidSortKeyType(String field) {
+		TypeInformation<?> sortKeyType;
+
+		field = field.trim();
+		if(field.equals("*") || field.equals("_")) {
+			sortKeyType = this.type;
+		} else {
+			sortKeyType = ((CompositeType<?>) this.type).getTypeAt(field);
+		}
+
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+		}
+	}
+
 	/**
 	 * @return Configuration for the OutputFormat.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 2c067fd..a2cde07 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -82,19 +82,19 @@ public abstract class Keys<T> {
 
 			this.keyExtractor = keyExtractor;
 			this.keyType = keyType;
-			
+
+			if(!keyType.isKeyType()) {
+				throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key type");
+			}
+
 			// we have to handle a special case here:
-			// if the keyType is a tuple type, we need to select the full tuple with all its fields.
-			if(keyType.isTupleType()) {
+			// if the keyType is a composite type, we need to select the full type with all its fields.
+			if(keyType instanceof CompositeType) {
 				ExpressionKeys<K> ek = new ExpressionKeys<K>(new String[] {ExpressionKeys.SELECT_ALL_CHAR}, keyType);
 				logicalKeyFields = ek.computeLogicalKeyPositions();
 			} else {
 				logicalKeyFields = new int[] {0};
 			}
-
-			if (!this.keyType.isKeyType()) {
-				throw new IllegalArgumentException("Invalid type of KeySelector keys");
-			}
 		}
 
 		public TypeInformation<K> getKeyType() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 287bf82..4c6c952 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import com.google.common.base.Preconditions;
@@ -63,6 +64,8 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (field >= dataSet.getType().getArity()) {
 			throw new IllegalArgumentException("Order key out of tuple bounds.");
 		}
+		isValidSortKeyType(field);
+
 		// use int-based expression key to properly resolve nested tuples for grouping
 		ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType());
 		this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
@@ -79,6 +82,8 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (!(dataSet.getType() instanceof CompositeType)) {
 			throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
 		}
+		isValidSortKeyType(field);
+
 		// resolve String-field to int using the expression keys
 		ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType());
 		this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
@@ -95,6 +100,10 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (!(this.keys instanceof Keys.SelectorFunctionKeys)) {
 			throw new InvalidProgramException("Sorting on KeySelector keys only works with KeySelector grouping.");
 		}
+		TypeInformation<?> sortKeyType = keySelector.getKeyType();
+		if(!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Key type " + sortKeyType +" is not sortable.");
+		}
 
 		this.groupSortKeyPositions = keySelector.computeLogicalKeyPositions();
 		for (int i = 0; i < groupSortKeyPositions.length; i++) {
@@ -218,35 +227,22 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (field >= dataSet.getType().getArity()) {
 			throw new IllegalArgumentException("Order key out of tuple bounds.");
 		}
+		isValidSortKeyType(field);
+
 		ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType());
 		addSortGroupInternal(ek, order);
 		return this;
 	}
-	
-	private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
-		Preconditions.checkArgument(order != null, "Order can not be null");
-		int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
-		
-		int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length;
-		this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
-		this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
-		int pos = newLength - additionalKeyPositions.length;
-		int off = newLength - additionalKeyPositions.length;
-		for(;pos < newLength; pos++) {
-			this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off];
-			this.groupSortOrders[pos] = order; // use the same order
-		}
-	}
-	
+
 	/**
 	 * Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements within a group on the specified field in the specified {@link Order}.</br>
 	 * <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/>
 	 * Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls.
-	 * 
+	 *
 	 * @param field The Tuple or Pojo field on which the group is sorted.
 	 * @param order The Order in which the specified field is sorted.
 	 * @return A SortedGrouping with specified order of group element.
-	 * 
+	 *
 	 * @see org.apache.flink.api.java.tuple.Tuple
 	 * @see Order
 	 */
@@ -257,9 +253,48 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (! (dataSet.getType() instanceof CompositeType)) {
 			throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
 		}
+		isValidSortKeyType(field);
+
 		ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType());
 		addSortGroupInternal(ek, order);
 		return this;
 	}
+	
+	private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
+		Preconditions.checkArgument(order != null, "Order can not be null");
+		int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
+		
+		int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length;
+		this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
+		this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
+		int pos = newLength - additionalKeyPositions.length;
+		int off = newLength - additionalKeyPositions.length;
+		for(;pos < newLength; pos++) {
+			this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off];
+			this.groupSortOrders[pos] = order; // use the same order
+		}
+	}
+
+	private void isValidSortKeyType(int field) {
+		TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) dataSet.getType()).getTypeAt(field);
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+		}
+	}
+
+	private void isValidSortKeyType(String field) {
+		TypeInformation<?> sortKeyType;
+
+		field = field.trim();
+		if(field.equals("*") || field.equals("_")) {
+			sortKeyType = this.getDataSet().getType();
+		} else {
+			sortKeyType = ((CompositeType<?>) this.getDataSet().getType()).getTypeAt(field);
+		}
+
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 1dcee24..2f3db7c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -118,8 +118,11 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	}
 
 	@Override
-	public boolean isKeyType() {
-		return Comparable.class.isAssignableFrom(typeClass);
+	public boolean isSortKeyType() {
+		// Support for sorting POJOs that implement Comparable is not implemented yet.
+		// Since the order of fields in a POJO type is not well defined, sorting on fields
+		//   gives only some undefined order.
+		return false;
 	}
 	
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index d1c2c9d..5051449 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -223,11 +223,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 	}
 	
 	@Override
-	public boolean isKeyType() {
-		return isValidKeyType(this);
-	}
-
-	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof TupleTypeInfoBase) {
 			@SuppressWarnings("unchecked")
@@ -245,20 +240,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 		return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
 	}
 
-	private boolean isValidKeyType(TypeInformation<?> typeInfo) {
-		if(typeInfo instanceof TupleTypeInfoBase) {
-			TupleTypeInfoBase<?> tupleType = ((TupleTypeInfoBase<?>)typeInfo);
-			for(int i=0;i<tupleType.getArity();i++) {
-				if (!isValidKeyType(tupleType.getTypeAt(i))) {
-					return false;
-				}
-			}
-			return true;
-		} else  {
-			return typeInfo.isKeyType();
-		}
-	}
-
 	@Override
 	public String toString() {
 		StringBuilder bld = new StringBuilder("Tuple");

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
index 7a7ed14..37ad381 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -256,23 +256,6 @@ public class DataSinkTest {
 	}
 
 	@Test
-	public void testPojoSingleOrderFull() {
-
-		final ExecutionEnvironment env = ExecutionEnvironment
-				.getExecutionEnvironment();
-		DataSet<CustomType> pojoDs = env
-				.fromCollection(pojoData);
-
-		// should work
-		try {
-			pojoDs.writeAsText("/tmp/willNotHappen")
-					.sortLocalOutput("*", Order.ASCENDING);
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-
-	@Test
 	public void testPojoTwoOrder() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment
@@ -317,6 +300,35 @@ public class DataSinkTest {
 				.sortLocalOutput("notThere", Order.DESCENDING);
 	}
 
+	@Test(expected = InvalidProgramException.class)
+	public void testPojoSingleOrderFull() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<CustomType> pojoDs = env
+				.fromCollection(pojoData);
+
+		// must not work
+		pojoDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput("*", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testArrayOrderFull() {
+
+		List<Object[]> arrayData = new ArrayList<Object[]>();
+		arrayData.add(new Object[0]);
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Object[]> pojoDs = env
+				.fromCollection(arrayData);
+
+		// must not work
+		pojoDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput("*", Order.ASCENDING);
+	}
+
 	/**
 	 * Custom data type, for testing purposes.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index c958680..314695f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -24,13 +24,16 @@ import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,11 +51,23 @@ public class GroupingTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
-	
+
+	private final TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomInfo = new
+			TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				TypeExtractor.createTypeInfo(CustomType.class),
+				BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+			);
+
 	// LONG DATA
 	private final List<Long> emptyLongData = new ArrayList<Long>();
 	
 	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+
+	private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData =
+			new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
+
 	
 	@Test  
 	public void testGroupByKeyFields1() {
@@ -187,7 +202,6 @@ public class GroupingTest {
 		// should not work, key out of tuple bounds
 		ds.groupBy("nested.myNonExistent");
 	}
-
 	
 	@Test
 	@SuppressWarnings("serial")
@@ -233,41 +247,67 @@ public class GroupingTest {
 			Assert.fail();
 		}
 	}
-	
-	@Test(expected=IllegalArgumentException.class)
+
+	@Test
 	@SuppressWarnings("serial")
 	public void testGroupByKeySelector3() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
-		
-		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
-		// should not work
-		customDs.groupBy(
-				new KeySelector<GroupingTest.CustomType, CustomType>() {
-					@Override
-					public CustomType getKey(CustomType value) {
-						return value;
-				}
-		});
+
+		try {
+			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+			// should not work
+			customDs.groupBy(
+					new KeySelector<GroupingTest.CustomType, CustomType>() {
+						@Override
+						public CustomType getKey(CustomType value) {
+							return value;
+						}
+					});
+		} catch(Exception e) {
+			Assert.fail();
+		}
 	}
-	
-	@Test(expected=IllegalArgumentException.class)
+
+	@Test
 	@SuppressWarnings("serial")
 	public void testGroupByKeySelector4() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
-		
+
+		try {
+			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+			// should not work
+			customDs.groupBy(
+					new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>() {
+						@Override
+						public Tuple2<Integer, CustomType> getKey(CustomType value) {
+							return new Tuple2<Integer, CustomType>(value.myInt, value);
+						}
+					});
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	@SuppressWarnings("serial")
+	public void testGroupByKeySelector5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		this.customTypeData.add(new CustomType());
+
 		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 		// should not work
 		customDs.groupBy(
-				new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>() {
+				new KeySelector<GroupingTest.CustomType, CustomType2>() {
 					@Override
-					public Tuple2<Integer, CustomType> getKey(CustomType value) {
-						return new Tuple2<Integer, CustomType>(value.myInt, value);
-				}
-		});
+					public CustomType2 getKey(CustomType value) {
+						return new CustomType2();
+					}
+				});
 	}
 	
 	@Test
@@ -313,6 +353,30 @@ public class GroupingTest {
 		}).sortGroup(0, Order.ASCENDING);
 		
 	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortKeyFields4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(0)
+				.sortGroup(2, Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortKeyFields5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(0)
+				.sortGroup(3, Order.ASCENDING);
+	}
 	
 	@Test
 	public void testChainedGroupSortKeyFields() {
@@ -327,7 +391,166 @@ public class GroupingTest {
 			Assert.fail();
 		}
 	}
-	
+
+	@Test
+	public void testGroupSortByKeyExpression1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy("f0").sortGroup("f1", Order.ASCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testGroupSortByKeyExpression2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy("f0").sortGroup("f2.myString", Order.ASCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testGroupSortByKeyExpression3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy("f0")
+					.sortGroup("f2.myString", Order.ASCENDING)
+					.sortGroup("f1", Order.DESCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeyExpression4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy("f0")
+				.sortGroup("f2", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeyExpression5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy("f0")
+				.sortGroup("f1", Order.ASCENDING)
+				.sortGroup("f2", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeyExpression6() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy("f0")
+				.sortGroup("f3", Order.ASCENDING);
+	}
+
+	@SuppressWarnings("serial")
+	@Test
+	public void testGroupSortByKeySelector1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(
+				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+					@Override
+					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+						return value.f1;
+					}
+				})
+				.sortGroup(
+						new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Integer>() {
+							@Override
+							public Integer getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+								return value.f0;
+							}
+						}, Order.ASCENDING);
+	}
+
+	@SuppressWarnings("serial")
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(
+				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+					@Override
+					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+						return value.f1;
+					}
+				})
+				.sortGroup(
+						new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, CustomType>() {
+							@Override
+							public CustomType getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+								return value.f2;
+							}
+						}, Order.ASCENDING);
+	}
+
+	@SuppressWarnings("serial")
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(
+				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+					@Override
+					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+						return value.f1;
+					}
+				})
+				.sortGroup(
+						new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long[]>() {
+							@Override
+							public Long[] getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+								return value.f3;
+							}
+						}, Order.ASCENDING);
+	}
+
 
 	public static class CustomType implements Serializable {
 		
@@ -354,4 +577,11 @@ public class GroupingTest {
 			return myInt+","+myLong+","+myString;
 		}
 	}
+
+	public static class CustomType2 implements Serializable {
+
+		public int myInt;
+		public int[] myIntArray;
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
new file mode 100644
index 0000000..a4e2bbc
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SortPartitionTest {
+
+	// TUPLE DATA
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	private final TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomInfo = new
+			TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				TypeExtractor.createTypeInfo(CustomType.class),
+				BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+			);
+
+	// LONG DATA
+	private final List<Long> emptyLongData = new ArrayList<Long>();
+
+	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+
+	private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData =
+			new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
+
+
+	@Test
+	public void testSortPartitionPositionKeys1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.sortPartition(0, Order.ASCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testSortPartitionPositionKeys2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs
+					.sortPartition(0, Order.ASCENDING)
+					.sortPartition(3, Order.DESCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testSortPartitionWithPositionKeys3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// must not work
+		tupleDs.sortPartition(2, Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testSortPartitionWithPositionKeys4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// must not work
+		tupleDs.sortPartition(3, Order.ASCENDING);
+	}
+
+	@Test
+	public void testSortPartitionExpressionKeys1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.sortPartition("f1", Order.ASCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testSortPartitionExpressionKeys2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should work
+		try {
+			tupleDs
+					.sortPartition("f0", Order.ASCENDING)
+					.sortPartition("f2.nested.myInt", Order.DESCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testSortPartitionWithExpressionKeys3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// must not work
+		tupleDs.sortPartition("f2.nested", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testSortPartitionWithExpressionKeys4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// must not work
+		tupleDs.sortPartition("f3", Order.ASCENDING);
+	}
+
+	public static class CustomType implements Serializable {
+		
+		public static class Nest {
+			public int myInt;
+		}
+		private static final long serialVersionUID = 1L;
+		
+		public int myInt;
+		public long myLong;
+		public String myString;
+		public Nest nested;
+		
+		public CustomType() {};
+		
+		public CustomType(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+		}
+		
+		@Override
+		public String toString() {
+			return myInt+","+myLong+","+myString;
+		}
+	}
+
+	public static class CustomType2 implements Serializable {
+
+		public int myInt;
+		public int[] myIntArray;
+
+	}
+}


[3/5] flink git commit: [FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
index cf81550..372c0f7 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.api.java.functions;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -34,6 +35,8 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 public class SemanticPropUtilTest {
 	
 	private final TypeInformation<?> threeIntTupleType = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO,
@@ -74,43 +77,43 @@ public class SemanticPropUtilTest {
 
 		int[] pMap = new int[] {3,0,4};
 		SingleInputSemanticProperties sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) fiveIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
 
 		pMap = new int[] {2,2,1,1};
 		sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) fiveIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 2);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 2);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 2);
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 2);
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
 
 		pMap = new int[] {2,0};
 		sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) nestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
 
 		pMap = new int[] {2,0,1};
 		sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) deepNestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 6).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 6).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 5).contains(6));
 
 		pMap = new int[] {2, 1};
 		sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) pojoInTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(4));
 
 	}
 
@@ -121,88 +124,199 @@ public class SemanticPropUtilTest {
 		boolean[] iMap = new boolean[]{true, true, false, true, false, false};
 		DualInputSemanticProperties sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap,
 				fiveIntTupleType, fiveIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 3).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 4).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(1, 0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
+		assertTrue(sp.getForwardingTargetFields(1, 3).contains(4));
+		assertTrue(sp.getForwardingTargetFields(1, 4).contains(5));
 
 		pMap = new int[]{4,2,0,4,0,1};
 		iMap = new boolean[]{true, true, false, true, false, false};
 		sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, fiveIntTupleType, fiveIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).size() == 2);
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).size() == 2);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 1).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 4).size() == 2);
+		assertTrue(sp.getForwardingTargetFields(1, 0).size() == 2);
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(1, 0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(3));
+		assertTrue(sp.getForwardingTargetFields(1, 0).contains(4));
+		assertTrue(sp.getForwardingTargetFields(1, 1).contains(5));
 
 		pMap = new int[]{2,1,0,1};
 		iMap = new boolean[]{false, false, true, true};
 		sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, nestedTupleType, threeIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 2).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(5));
+		assertTrue(sp.getForwardingTargetFields(1, 2).contains(0));
+		assertTrue(sp.getForwardingTargetFields(1, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(5));
 
 		pMap = new int[]{1,0,0};
 		iMap = new boolean[]{false, false, true};
 		sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, nestedTupleType, deepNestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 1).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 2).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 3).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 4).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 5).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(6));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(7));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(8));
+		assertTrue(sp.getForwardingTargetFields(1, 1).contains(0));
+		assertTrue(sp.getForwardingTargetFields(1, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(1, 3).contains(2));
+		assertTrue(sp.getForwardingTargetFields(1, 4).contains(3));
+		assertTrue(sp.getForwardingTargetFields(1, 5).contains(4));
+		assertTrue(sp.getForwardingTargetFields(1, 0).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(7));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(8));
 
 		pMap = new int[]{4,2,1,0};
 		iMap = new boolean[]{true, false, true, false};
 		sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, fiveIntTupleType, pojoInTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 2).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 3).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 4).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 5).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
+		assertTrue(sp.getForwardingTargetFields(1, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(1, 3).contains(2));
+		assertTrue(sp.getForwardingTargetFields(1, 4).contains(3));
+		assertTrue(sp.getForwardingTargetFields(1, 5).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(5));
+		assertTrue(sp.getForwardingTargetFields(1, 0).contains(6));
 
 		pMap = new int[]{2,3,-1,0};
 		iMap = new boolean[]{true, true, false, true};
 		sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, fiveIntTupleType, intType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(sp.getForwardingTargetFields(1, 0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(3));
 
 		pMap = new int[]{-1,-1};
 		iMap = new boolean[]{false, true};
 		sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, intType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 4).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 5).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(6));
+		assertTrue(sp.getForwardingTargetFields(1, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(1, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(1, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(1, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(1, 4).contains(4));
+		assertTrue(sp.getForwardingTargetFields(1, 5).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(6));
 
 		pMap = new int[]{-1,-1};
 		iMap = new boolean[]{true, false};
 		sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, intType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 1).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 2).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 3).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 4).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(1, 5).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(1, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(1, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(1, 2).contains(3));
+		assertTrue(sp.getForwardingTargetFields(1, 3).contains(4));
+		assertTrue(sp.getForwardingTargetFields(1, 4).contains(5));
+		assertTrue(sp.getForwardingTargetFields(1, 5).contains(6));
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Offset
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testAddSourceFieldOffset() {
+
+		SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
+		semProps.addForwardedField(0, 1);
+		semProps.addForwardedField(0, 4);
+		semProps.addForwardedField(2, 0);
+		semProps.addForwardedField(4, 3);
+		semProps.addReadFields(new FieldSet(0,3));
+
+		SemanticProperties offsetProps = SemanticPropUtil.addSourceFieldOffset(semProps, 5, 0);
+
+		assertTrue(offsetProps.getForwardingTargetFields(0, 0).size() == 2);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 0).contains(4));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 4).contains(3));
+
+		assertTrue(offsetProps.getReadFields(0).size() == 2);
+		assertTrue(offsetProps.getReadFields(0).contains(0));
+		assertTrue(offsetProps.getReadFields(0).contains(3));
+
+		offsetProps = SemanticPropUtil.addSourceFieldOffset(semProps, 5, 3);
+
+		assertTrue(offsetProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 5).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 5).contains(0));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 7).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 7).contains(3));
+
+		assertTrue(offsetProps.getReadFields(0).size() == 2);
+		assertTrue(offsetProps.getReadFields(0).contains(3));
+		assertTrue(offsetProps.getReadFields(0).contains(6));
+
+		semProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.addSourceFieldOffset(semProps, 1, 0);
+
+		semProps = new SingleInputSemanticProperties();
+		semProps.addForwardedField(0, 0);
+		semProps.addForwardedField(1, 2);
+		semProps.addForwardedField(2, 4);
+
+		offsetProps = SemanticPropUtil.addSourceFieldOffset(semProps, 3, 2);
+
+		assertTrue(offsetProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 3).contains(2));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 4).contains(4));
+
+	}
+
+	@Test
+	public void testAddSourceFieldOffsets() {
+
+		DualInputSemanticProperties semProps = new DualInputSemanticProperties();
+		semProps.addForwardedField(0, 0, 1);
+		semProps.addForwardedField(0, 3, 3);
+		semProps.addForwardedField(1, 1, 2);
+		semProps.addForwardedField(1, 1, 4);
+		semProps.addReadFields(0, new FieldSet(1, 2));
+		semProps.addReadFields(1, new FieldSet(0, 3, 4));
+
+		DualInputSemanticProperties offsetProps = SemanticPropUtil.addSourceFieldOffsets(semProps, 4, 3, 1, 2);
+
+		assertTrue(offsetProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 1).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(offsetProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(offsetProps.getForwardingTargetFields(0, 4).contains(3));
+
+		assertTrue(offsetProps.getForwardingTargetFields(1, 0).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(1, 2).size() == 0);
+		assertTrue(offsetProps.getForwardingTargetFields(1, 3).size() == 2);
+		assertTrue(offsetProps.getForwardingTargetFields(1, 3).contains(2));
+		assertTrue(offsetProps.getForwardingTargetFields(1, 3).contains(4));
+
+		assertTrue(offsetProps.getReadFields(0).size() == 2);
+		assertTrue(offsetProps.getReadFields(0).contains(2));
+		assertTrue(offsetProps.getReadFields(0).contains(3));
+		assertTrue(offsetProps.getReadFields(1).size() == 3);
+		assertTrue(offsetProps.getReadFields(1).contains(2));
+		assertTrue(offsetProps.getReadFields(1).contains(5));
+		assertTrue(offsetProps.getReadFields(1).contains(6));
+
+		semProps = new DualInputSemanticProperties();
+		SemanticPropUtil.addSourceFieldOffsets(semProps, 4, 3, 2, 2);
 
 	}
 
@@ -216,9 +330,9 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, fiveIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
 	}
 
 	@Test
@@ -227,23 +341,23 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, fiveIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
 
 		forwardedFields[0] = "2;3;0";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, fiveIntTupleType, fiveIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
 
 		forwardedFields[0] = "2;3;0;";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, fiveIntTupleType, fiveIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
 	}
 
 	@Test
@@ -252,9 +366,9 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, fiveIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
 	}
 
 	@Test
@@ -263,8 +377,8 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
 	}
 
 	@Test
@@ -273,14 +387,14 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
 
 		forwardedFields[0] = "0->0;1->2";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, fiveIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
 	}
 
 	@Test
@@ -289,8 +403,8 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
 	}
 	
 	@Test
@@ -299,10 +413,10 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, fiveIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
 	}
 
 	@Test
@@ -311,19 +425,19 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, intType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
 
 		forwardedFields[0] = "*->f2";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, intType, threeIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(2));
 
 		forwardedFields[0] = "*->*";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, intType, intType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
 	}
 
 	@Test
@@ -332,19 +446,19 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
 
 		forwardedFields[0] = "*";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, deepNestedTupleType, deepNestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
 
 	}
 
@@ -353,79 +467,79 @@ public class SemanticPropUtilTest {
 		String[] forwardedFields = { "f0->f0.f0; f1->f0.f1; f2->f0.f2" };
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, nestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
 
 		forwardedFields[0] = "f0.f0->f1.f0.f2; f0.f1->f2; f2->f1.f2; f1->f0";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, nestedTupleType, deepNestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(6));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(0));
 
 		forwardedFields[0] = "0.0->1.0.2; 0.1->2; 2->1.2; 1->0";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, nestedTupleType, deepNestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(6));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(0));
 
 		forwardedFields[0] = "f1.f0.*->f0.*; f0->f2";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, deepNestedTupleType, nestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
 
 		forwardedFields[0] = "1.0.*->0.*; 0->2";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, deepNestedTupleType, nestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
 
 		forwardedFields[0] = "f1.f0->f0; f0->f2";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, deepNestedTupleType, nestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
 
 		forwardedFields[0] = "1.0->0; 0->2";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, deepNestedTupleType, nestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
 
 		forwardedFields[0] = "f1.f0.f1; f1.f1; f2";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, deepNestedTupleType, deepNestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 6).contains(6));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 6).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
 
 		forwardedFields[0] = "f1.f0.*; f1.f2";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, deepNestedTupleType, deepNestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 5).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 6).size() == 0);
 
 	}
 
@@ -435,118 +549,118 @@ public class SemanticPropUtilTest {
 		String[] forwardedFields = { "int1->int2; int3->int1; string1 " };
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, pojoType, pojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
 
 		forwardedFields[0] = "f1->int1; f0->int3 ";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, pojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
 
 		forwardedFields[0] = "int1->f2; int2->f0; int3->f1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, pojoType, threeIntTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
 
 		forwardedFields[0] = "*->pojo1.*";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, pojoType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
 
 		forwardedFields[0] = "*->pojo1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, pojoType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
 
 		forwardedFields[0] = "int1; string1; int2->pojo1.int3";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, pojoType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(5));
 
 		forwardedFields[0] = "pojo1.*->f2.*; int1->f1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, nestedPojoType, pojoInTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
 
 		forwardedFields[0] = "f2.*->*";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, pojoInTupleType, pojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 5).contains(3));
 
 		forwardedFields[0] = "pojo1->f2; int1->f1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, nestedPojoType, pojoInTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(5));
 
 		forwardedFields[0] = "f2->*";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, pojoInTupleType, pojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 5).contains(3));
 
 		forwardedFields[0] = "int2; string1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, pojoType, pojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
 
 		forwardedFields[0] = "pojo1.int1; string1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, nestedPojoType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 5).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 4).size() == 0);
 
 		forwardedFields[0] = "pojo1.*; int1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, nestedPojoType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
 
 		forwardedFields[0] = "pojo1; int1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, nestedPojoType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
 	}
 
 	@Test(expected = InvalidSemanticAnnotationException.class)
@@ -664,9 +778,9 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
 	}
 
 	@Test
@@ -675,17 +789,17 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
 
 		nonForwardedFields[0] = "f1;f2;";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
 	}
 
 	@Test
@@ -694,9 +808,9 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
 	}
 
 	@Test
@@ -705,9 +819,9 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
 	}
 
 	@Test
@@ -716,36 +830,36 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, deepNestedTupleType, deepNestedTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 6).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 6).contains(6));
 
 		nonForwardedFields[0] = "f1.f0; f1.f2; f0";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, deepNestedTupleType, deepNestedTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 6).contains(6));
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 6).contains(6));
 
 		nonForwardedFields[0] = "f2; f1.f1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, deepNestedTupleType, deepNestedTupleType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).contains(5));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 5).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 6).size() == 0);
 
 	}
 
@@ -755,10 +869,10 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, pojoType, pojoType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
 
 	}
 
@@ -768,22 +882,22 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, nestedPojoType, nestedPojoType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).contains(5));
+		assertTrue(sp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 5).contains(5));
 
 		nonForwardedFields[0] = "pojo1.int2; string1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, nestedPojoType, nestedPojoType);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(sp.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(sp.getForwardingTargetFields(0, 5).size() == 0);
 	}
 
 	@Test(expected = InvalidSemanticAnnotationException.class)
@@ -847,9 +961,9 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, threeIntTupleType, threeIntTupleType);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 2);
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(1));
+		assertTrue(fs.size() == 2);
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(1));
 	}
 	
 	@Test
@@ -859,18 +973,18 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, threeIntTupleType, threeIntTupleType);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 2);
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(1));
+		assertTrue(fs.size() == 2);
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(1));
 
 		readFields[0] = "f1;f2;";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, threeIntTupleType, threeIntTupleType);
 
 		fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 2);
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(1));
+		assertTrue(fs.size() == 2);
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(1));
 	}
 
 	@Test
@@ -880,9 +994,9 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, threeIntTupleType, threeIntTupleType);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 2);
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(1));
+		assertTrue(fs.size() == 2);
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(1));
 	}
 
 	@Test
@@ -892,15 +1006,15 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, intType, intType);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 1);
-		Assert.assertTrue(fs.contains(0));
+		assertTrue(fs.size() == 1);
+		assertTrue(fs.contains(0));
 
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, intType, fiveIntTupleType);
 
 		fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 1);
-		Assert.assertTrue(fs.contains(0));
+		assertTrue(fs.size() == 1);
+		assertTrue(fs.contains(0));
 	}
 
 	@Test
@@ -910,21 +1024,21 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, nestedTupleType, intType);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 3);
-		Assert.assertTrue(fs.contains(1));
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(4));
+		assertTrue(fs.size() == 3);
+		assertTrue(fs.contains(1));
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(4));
 
 		readFields[0] = "f0;f1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, nestedTupleType, intType);
 
 		fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 4);
-		Assert.assertTrue(fs.contains(0));
-		Assert.assertTrue(fs.contains(1));
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(3));
+		assertTrue(fs.size() == 4);
+		assertTrue(fs.contains(0));
+		assertTrue(fs.contains(1));
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(3));
 	}
 
 	@Test
@@ -934,23 +1048,23 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, nestedTupleType, intType);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 5);
-		Assert.assertTrue(fs.contains(0));
-		Assert.assertTrue(fs.contains(1));
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(3));
-		Assert.assertTrue(fs.contains(4));
+		assertTrue(fs.size() == 5);
+		assertTrue(fs.contains(0));
+		assertTrue(fs.contains(1));
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(3));
+		assertTrue(fs.contains(4));
 
 		readFields[0] = "f0.*;f1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, nestedTupleType, intType);
 
 		fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 4);
-		Assert.assertTrue(fs.contains(0));
-		Assert.assertTrue(fs.contains(1));
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(3));
+		assertTrue(fs.size() == 4);
+		assertTrue(fs.contains(0));
+		assertTrue(fs.contains(1));
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(3));
 
 	}
 
@@ -961,20 +1075,20 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, pojoType, threeIntTupleType);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 2);
-		Assert.assertTrue(fs.contains(1));
-		Assert.assertTrue(fs.contains(3));
+		assertTrue(fs.size() == 2);
+		assertTrue(fs.contains(1));
+		assertTrue(fs.contains(3));
 
 		readFields[0] = "*";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, pojoType, intType);
 
 		fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 4);
-		Assert.assertTrue(fs.contains(0));
-		Assert.assertTrue(fs.contains(1));
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(3));
+		assertTrue(fs.size() == 4);
+		assertTrue(fs.contains(0));
+		assertTrue(fs.contains(1));
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(3));
 	}
 
 	@Test
@@ -984,32 +1098,32 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, nestedPojoType, intType);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 3);
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(4));
-		Assert.assertTrue(fs.contains(5));
+		assertTrue(fs.size() == 3);
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(4));
+		assertTrue(fs.contains(5));
 
 		readFields[0] = "pojo1.*";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, nestedPojoType, intType);
 
 		fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 4);
-		Assert.assertTrue(fs.contains(1));
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(3));
-		Assert.assertTrue(fs.contains(4));
+		assertTrue(fs.size() == 4);
+		assertTrue(fs.contains(1));
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(3));
+		assertTrue(fs.contains(4));
 
 		readFields[0] = "pojo1";
 		sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, nestedPojoType, intType);
 
 		fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 4);
-		Assert.assertTrue(fs.contains(1));
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(3));
-		Assert.assertTrue(fs.contains(4));
+		assertTrue(fs.size() == 4);
+		assertTrue(fs.contains(1));
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(3));
+		assertTrue(fs.contains(4));
 	}
 
 	@Test
@@ -1019,10 +1133,10 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, pojoInTupleType, pojo2Type);
 
 		FieldSet fs = sp.getReadFields(0);
-		Assert.assertTrue(fs.size() == 3);
-		Assert.assertTrue(fs.contains(0));
-		Assert.assertTrue(fs.contains(2));
-		Assert.assertTrue(fs.contains(5));
+		assertTrue(fs.size() == 3);
+		assertTrue(fs.contains(0));
+		assertTrue(fs.contains(2));
+		assertTrue(fs.contains(5));
 	}
 
 	@Test(expected = InvalidSemanticAnnotationException.class)
@@ -1044,12 +1158,12 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, forwardedFieldsFirst, forwardedFieldsSecond, null,
 				null, null, null, fourIntTupleType, fourIntTupleType, fourIntTupleType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).contains(3));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 3).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(0, 2).contains(3));
+		assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
+		assertTrue(dsp.getForwardingTargetFields(1, 2).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 3).size() == 0);
 
 		forwardedFieldsFirst[0] = "f1->f0;f3->f1";
 		forwardedFieldsSecond[0] = "*->f2.*";
@@ -1057,14 +1171,14 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, forwardedFieldsFirst, forwardedFieldsSecond, null,
 				null, null, null, fourIntTupleType, pojoType, pojoInTupleType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 3).contains(1));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 0).contains(2));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).contains(3));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).contains(4));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 3).contains(5));
+		assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 1).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(dsp.getForwardingTargetFields(1, 0).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(1, 1).contains(3));
+		assertTrue(dsp.getForwardingTargetFields(1, 2).contains(4));
+		assertTrue(dsp.getForwardingTargetFields(1, 3).contains(5));
 
 		forwardedFieldsFirst[0] = "f1.f0.f2->int1; f2->pojo1.int3";
 		forwardedFieldsSecond[0] = "string1; int2->pojo1.int1; int1->pojo1.int2";
@@ -1072,17 +1186,17 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, forwardedFieldsFirst, forwardedFieldsSecond, null,
 				null, null, null, deepNestedTupleType, pojoType, nestedPojoType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 3).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 4).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 5).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 6).contains(3));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 0).contains(2));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 3).contains(5));
+		assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 3).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 6).contains(3));
+		assertTrue(dsp.getForwardingTargetFields(1, 0).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
+		assertTrue(dsp.getForwardingTargetFields(1, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 3).contains(5));
 
 		String[] forwardedFieldsFirst2 = { "f1.f0.f2->int1", "f2->pojo1.int3" };
 		String[] forwardedFieldsSecond2 = { "string1", "int2->pojo1.int1", "int1->pojo1.int2" };
@@ -1090,17 +1204,17 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, forwardedFieldsFirst2, forwardedFieldsSecond2, null,
 				null, null, null, deepNestedTupleType, pojoType, nestedPojoType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 3).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 4).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 5).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 6).contains(3));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 0).contains(2));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 3).contains(5));
+		assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 3).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 6).contains(3));
+		assertTrue(dsp.getForwardingTargetFields(1, 0).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
+		assertTrue(dsp.getForwardingTargetFields(1, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 3).contains(5));
 	}
 
 
@@ -1112,12 +1226,12 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				nonNorwardedFieldsFirst, nonNorwardedFieldsSecond, null, null, threeIntTupleType, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
+		assertTrue(dsp.getForwardingTargetFields(1, 2).contains(2));
 
 		nonNorwardedFieldsFirst[0] = "f1";
 		nonNorwardedFieldsSecond[0] = "";
@@ -1125,12 +1239,12 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				nonNorwardedFieldsFirst, null, null, null, threeIntTupleType, fiveIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(1, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 2).size() == 0);
 
 		nonNorwardedFieldsFirst[0] = "";
 		nonNorwardedFieldsSecond[0] = "f2;f0";
@@ -1138,33 +1252,33 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				null, nonNorwardedFieldsSecond, null, null, fiveIntTupleType, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
+		assertTrue(dsp.getForwardingTargetFields(1, 2).size() == 0);
 
 		String[] nonForwardedFields = {"f1", "f3"};
 		dsp = new DualInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				nonForwardedFields, null, null, null, fiveIntTupleType, threeIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).contains(2));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 3).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(dsp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(0, 4).contains(4));
 
 		dsp = new DualInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				null, nonForwardedFields, null, null, threeIntTupleType, fiveIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 0).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).contains(2));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 3).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 4).contains(4));
+		assertTrue(dsp.getForwardingTargetFields(1, 0).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 2).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(1, 3).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 4).contains(4));
 	}
 
 	@Test(expected = InvalidSemanticAnnotationException.class)
@@ -1193,11 +1307,11 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				null, null, readFieldsFirst, readFieldsSecond, threeIntTupleType, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(dsp.getReadFields(0).size() == 2);
-		Assert.assertTrue(dsp.getReadFields(0).contains(1));
-		Assert.assertTrue(dsp.getReadFields(0).contains(2));
-		Assert.assertTrue(dsp.getReadFields(1).size() == 1);
-		Assert.assertTrue(dsp.getReadFields(1).contains(0));
+		assertTrue(dsp.getReadFields(0).size() == 2);
+		assertTrue(dsp.getReadFields(0).contains(1));
+		assertTrue(dsp.getReadFields(0).contains(2));
+		assertTrue(dsp.getReadFields(1).size() == 1);
+		assertTrue(dsp.getReadFields(1).contains(0));
 
 		readFieldsFirst[0] = "f0.*; f2";
 		readFieldsSecond[0] = "int1; string1";
@@ -1205,14 +1319,14 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				null, null, readFieldsFirst, readFieldsSecond, nestedTupleType, pojoType, threeIntTupleType);
 
-		Assert.assertTrue(dsp.getReadFields(0).size() == 4);
-		Assert.assertTrue(dsp.getReadFields(0).contains(0));
-		Assert.assertTrue(dsp.getReadFields(0).contains(1));
-		Assert.assertTrue(dsp.getReadFields(0).contains(2));
-		Assert.assertTrue(dsp.getReadFields(0).contains(4));
-		Assert.assertTrue(dsp.getReadFields(1).size() == 2);
-		Assert.assertTrue(dsp.getReadFields(1).contains(0));
-		Assert.assertTrue(dsp.getReadFields(1).contains(3));
+		assertTrue(dsp.getReadFields(0).size() == 4);
+		assertTrue(dsp.getReadFields(0).contains(0));
+		assertTrue(dsp.getReadFields(0).contains(1));
+		assertTrue(dsp.getReadFields(0).contains(2));
+		assertTrue(dsp.getReadFields(0).contains(4));
+		assertTrue(dsp.getReadFields(1).size() == 2);
+		assertTrue(dsp.getReadFields(1).contains(0));
+		assertTrue(dsp.getReadFields(1).contains(3));
 
 		readFieldsFirst[0] = "pojo1.int2; string1";
 		readFieldsSecond[0] = "f2.int2";
@@ -1220,25 +1334,25 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				null, null, readFieldsFirst, readFieldsSecond, nestedPojoType, pojoInTupleType, threeIntTupleType);
 
-		Assert.assertTrue(dsp.getReadFields(0).size() == 2);
-		Assert.assertTrue(dsp.getReadFields(0).contains(2));
-		Assert.assertTrue(dsp.getReadFields(0).contains(5));
-		Assert.assertTrue(dsp.getReadFields(1).size() == 1);
-		Assert.assertTrue(dsp.getReadFields(1).contains(3));
+		assertTrue(dsp.getReadFields(0).size() == 2);
+		assertTrue(dsp.getReadFields(0).contains(2));
+		assertTrue(dsp.getReadFields(0).contains(5));
+		assertTrue(dsp.getReadFields(1).size() == 1);
+		assertTrue(dsp.getReadFields(1).contains(3));
 
 		String[] readFields = {"f0", "f2", "f4"};
 		dsp = new DualInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, null, null,
 				null, null, readFields, readFields, fiveIntTupleType, fiveIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(dsp.getReadFields(0).size() == 3);
-		Assert.assertTrue(dsp.getReadFields(0).contains(0));
-		Assert.assertTrue(dsp.getReadFields(0).contains(2));
-		Assert.assertTrue(dsp.getReadFields(0).contains(4));
-		Assert.assertTrue(dsp.getReadFields(1).size() == 3);
-		Assert.assertTrue(dsp.getReadFields(1).contains(0));
-		Assert.assertTrue(dsp.getReadFields(1).contains(2));
-		Assert.assertTrue(dsp.getReadFields(1).contains(4));
+		assertTrue(dsp.getReadFields(0).size() == 3);
+		assertTrue(dsp.getReadFields(0).contains(0));
+		assertTrue(dsp.getReadFields(0).contains(2));
+		assertTrue(dsp.getReadFields(0).contains(4));
+		assertTrue(dsp.getReadFields(1).size() == 3);
+		assertTrue(dsp.getReadFields(1).contains(0));
+		assertTrue(dsp.getReadFields(1).contains(2));
+		assertTrue(dsp.getReadFields(1).contains(4));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1252,11 +1366,11 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, readFields, threeIntTupleType, fiveIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(sp.getReadFields(0).size() == 2);
-		Assert.assertTrue(sp.getReadFields(0).contains(0));
-		Assert.assertTrue(sp.getReadFields(0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(sp.getReadFields(0).size() == 2);
+		assertTrue(sp.getReadFields(0).contains(0));
+		assertTrue(sp.getReadFields(0).contains(2));
 	}
 
 	@Test
@@ -1266,12 +1380,12 @@ public class SemanticPropUtilTest {
 		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
 		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, readFields, threeIntTupleType, threeIntTupleType);
 
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
-		Assert.assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
-		Assert.assertTrue(sp.getReadFields(0).size() == 2);
-		Assert.assertTrue(sp.getReadFields(0).contains(0));
-		Assert.assertTrue(sp.getReadFields(0).contains(2));
+		assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(sp.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(sp.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(sp.getReadFields(0).size() == 2);
+		assertTrue(sp.getReadFields(0).contains(0));
+		assertTrue(sp.getReadFields(0).contains(2));
 	}
 
 	@Test
@@ -1284,17 +1398,17 @@ public class SemanticPropUtilTest {
 		SemanticPropUtil.getSemanticPropsDualFromString(dsp, forwardedFieldsFirst, forwardedFieldsSecond, null,
 				null, readFieldsFirst, readFieldsSecond, fourIntTupleType, fourIntTupleType, fourIntTupleType);
 
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 1).contains(2));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 2).contains(3));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 2).contains(0));
-		Assert.assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
-		Assert.assertTrue(dsp.getForwardingTargetFields(1, 3).size() == 0);
-		Assert.assertTrue(dsp.getReadFields(0).size() == 2);
-		Assert.assertTrue(dsp.getReadFields(0).contains(0));
-		Assert.assertTrue(dsp.getReadFields(0).contains(2));
-		Assert.assertTrue(dsp.getReadFields(1).size() == 1);
-		Assert.assertTrue(dsp.getReadFields(1).contains(1));
+		assertTrue(dsp.getForwardingTargetFields(0, 1).contains(2));
+		assertTrue(dsp.getForwardingTargetFields(0, 2).contains(3));
+		assertTrue(dsp.getForwardingTargetFields(1, 1).contains(1));
+		assertTrue(dsp.getForwardingTargetFields(1, 2).contains(0));
+		assertTrue(dsp.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(dsp.getForwardingTargetFields(1, 3).size() == 0);
+		assertTrue(dsp.getReadFields(0).size() == 2);
+		assertTrue(dsp.getReadFields(0).contains(0));
+		assertTrue(dsp.getReadFields(0).contains(2));
+		assertTrue(dsp.getReadFields(1).size() == 1);
+		assertTrue(dsp.getReadFields(1).contains(1));
 
 	}
 


[2/5] flink git commit: [FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index d686633..60754e6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -21,6 +21,12 @@ package org.apache.flink.api.java.operator;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -33,6 +39,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operator.JoinOperatorTest.CustomType;
 
+import static org.junit.Assert.assertTrue;
+
 @SuppressWarnings("serial")
 public class CoGroupOperatorTest {
 
@@ -348,4 +356,128 @@ public class CoGroupOperatorTest {
 					}
 				);
 	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.with(new DummyTestCoGroupFunction1());
+
+		SemanticProperties semProps = coGroupOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0,3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0,3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 0);
+
+		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,6).contains(0));
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(6));
+
+		assertTrue(semProps.getReadFields(1).size() == 2);
+		assertTrue(semProps.getReadFields(1).contains(3));
+		assertTrue(semProps.getReadFields(1).contains(5));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.with(new DummyTestCoGroupFunction2())
+				.withForwardedFieldsFirst("2;4->0")
+				.withForwardedFieldsSecond("0->4;1;1->3");
+
+		SemanticProperties semProps = coGroupOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,6).contains(0));
+
+		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(1,3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(1,3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 0);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(3));
+		assertTrue(semProps.getReadFields(0).contains(4));
+
+		assertTrue(semProps.getReadFields(1) == null);
+	}
+
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFieldsFirst("0->4;1;1->3")
+	@FunctionAnnotation.ForwardedFieldsSecond("2;4->0")
+	@FunctionAnnotation.ReadFieldsFirst("0;2;4")
+	@FunctionAnnotation.ReadFieldsSecond("1;3")
+	public static class DummyTestCoGroupFunction1
+			implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
+						Tuple5<Integer, Long, String, Long, Integer>,
+						Tuple5<Integer, Long, String, Long, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple5<Integer, Long, String, Long, Integer>> first,
+							Iterable<Tuple5<Integer, Long, String, Long, Integer>> second,
+							Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.ReadFieldsFirst("0;1;2")
+	public static class DummyTestCoGroupFunction2
+			implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
+			Tuple5<Integer, Long, String, Long, Integer>,
+			Tuple5<Integer, Long, String, Long, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple5<Integer, Long, String, Long, Integer>> first,
+							Iterable<Tuple5<Integer, Long, String, Long, Integer>> second,
+							Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
new file mode 100644
index 0000000..4870d29
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class GroupCombineOperatorTest {
+
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.combineGroup(new DummyGroupCombineFunction1());
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.combineGroup(new DummyGroupCombineFunction1());
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(7));
+		assertTrue(semProps.getReadFields(0).contains(8));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.combineGroup(new DummyGroupCombineFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.combineGroup(new DummyGroupCombineFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(7));
+		assertTrue(semProps.getReadFields(0).contains(8));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.combineGroup(new DummyGroupCombineFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector6() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.combineGroup(new DummyGroupCombineFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 7).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 8);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 7);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 7);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector7() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.combineGroup(new DummyGroupCombineFunction4());
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyGroupCombineFunction1 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyGroupCombineFunction2 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	public static class DummyGroupCombineFunction3 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.NonForwardedFields("2;4")
+	public static class DummyGroupCombineFunction4 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
new file mode 100644
index 0000000..0bfe566
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class GroupReduceOperatorTest {
+
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduceGroup(new DummyGroupReduceFunction1());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.reduceGroup(new DummyGroupReduceFunction1());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(7));
+		assertTrue(semProps.getReadFields(0).contains(8));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduceGroup(new DummyGroupReduceFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.reduceGroup(new DummyGroupReduceFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(7));
+		assertTrue(semProps.getReadFields(0).contains(8));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduceGroup(new DummyGroupReduceFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector6() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.reduceGroup(new DummyGroupReduceFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 7).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 8);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 7);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 7);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector7() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduceGroup(new DummyGroupReduceFunction4());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyGroupReduceFunction1 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyGroupReduceFunction2 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	public static class DummyGroupReduceFunction3 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.NonForwardedFields("2;4")
+	public static class DummyGroupReduceFunction4 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index 3d4551d..f1aadca 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -23,20 +23,29 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 @SuppressWarnings("serial")
 public class JoinOperatorTest {
 
@@ -298,7 +307,7 @@ public class JoinOperatorTest {
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		try {
 			TypeInformation<?> t = ds1.join(ds2).where("f0.myInt").equalTo(4).getType();
-			Assert.assertTrue("not a composite type", t instanceof CompositeType);
+			assertTrue("not a composite type", t instanceof CompositeType);
 		} catch(Exception e) {
 			e.printStackTrace();
 			Assert.fail();
@@ -946,7 +955,136 @@ public class JoinOperatorTest {
 		.projectFirst(0)
 		.projectSecond(-1);
 	}
-	
+
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		JoinOperator<?,?,?> joinOp = tupleDs1.join(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.with(new DummyTestJoinFunction1());
+
+		SemanticProperties semProps = joinOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0,3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0,3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 0);
+
+		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,6).contains(0));
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(6));
+
+		assertTrue(semProps.getReadFields(1).size() == 2);
+		assertTrue(semProps.getReadFields(1).contains(3));
+		assertTrue(semProps.getReadFields(1).contains(5));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		JoinOperator<?,?,?> joinOp = tupleDs1.join(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.with(new DummyTestJoinFunction2())
+					.withForwardedFieldsFirst("2;4->0")
+					.withForwardedFieldsSecond("0->4;1;1->3");
+
+		SemanticProperties semProps = joinOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,6).contains(0));
+
+		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(1,3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(1,3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 0);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(3));
+		assertTrue(semProps.getReadFields(0).contains(4));
+
+		assertTrue(semProps.getReadFields(1) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		JoinOperator<?, ?, ? extends Tuple> joinOp = tupleDs1.join(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.projectFirst(2)
+				.projectSecond(0, 0, 3)
+				.projectFirst(0, 4)
+				.projectSecond(2);
+
+		SemanticProperties semProps = joinOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(5));
+
+		assertTrue(semProps.getForwardingTargetFields(1, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 2).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(1, 2).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(1, 2).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(1, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1, 4).contains(6));
+		assertTrue(semProps.getForwardingTargetFields(1, 5).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(1, 6).size() == 0);
+
+	}
+
 	/*
 	 * ####################################################################
 	 */
@@ -1001,8 +1139,8 @@ public class JoinOperatorTest {
 		public NestedCustomType nested;
 		public String myString;
 		public Object nothing;
-	//	public List<String> countries; need Kryo to support this
-	//	public Writable interfaceTest; need kryo
+		public List<String> countries;
+		public Writable interfaceTest;
 		
 		public CustomType() {};
 		
@@ -1010,6 +1148,8 @@ public class JoinOperatorTest {
 			myInt = i;
 			myLong = l;
 			myString = s;
+			countries = null;
+			interfaceTest = null;
 			nested = new NestedCustomType(i, l, s);
 		}
 		
@@ -1046,5 +1186,40 @@ public class JoinOperatorTest {
 		}
 	}
 
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFieldsFirst("0->4;1;1->3")
+	@FunctionAnnotation.ForwardedFieldsSecond("2;4->0")
+	@FunctionAnnotation.ReadFieldsFirst("0;2;4")
+	@FunctionAnnotation.ReadFieldsSecond("1;3")
+	public static class DummyTestJoinFunction1
+			implements JoinFunction<Tuple5<Integer, Long, String, Long, Integer>,
+									Tuple5<Integer, Long, String, Long, Integer>,
+									Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> join(
+				Tuple5<Integer, Long, String, Long, Integer> first,
+				Tuple5<Integer, Long, String, Long, Integer> second) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ReadFieldsFirst("0;1;2")
+	public static class DummyTestJoinFunction2
+			implements JoinFunction<Tuple5<Integer, Long, String, Long, Integer>,
+			Tuple5<Integer, Long, String, Long, Integer>,
+			Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> join(
+				Tuple5<Integer, Long, String, Long, Integer> first,
+				Tuple5<Integer, Long, String, Long, Integer> second) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
new file mode 100644
index 0000000..dafc1f2
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class ReduceOperatorTest {
+
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduce(new DummyReduceFunction1());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduce(new DummyReduceFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduce(new DummyReduceFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduce(new DummyReduceFunction4());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyReduceFunction1 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+																	Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyReduceFunction2 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+																   Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+
+	public static class DummyReduceFunction3 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+																   Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.NonForwardedFields("2;4")
+	public static class DummyReduceFunction4 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+																   Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index c5067f9..9eb9a37 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -1005,6 +1005,64 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
 	}
 
+	@Test
+	public void testGroupReduceSelectorKeysWithSemProps() throws Exception {
+
+		/*
+		 * Test that semantic properties are correctly adapted when using Selector Keys
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<Integer, Long>> reduceDs = ds
+				// group by selector key
+				.groupBy(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Long>() {
+					@Override
+					public Long getKey(Tuple5<Integer, Long, Integer, String, Long> v) throws Exception {
+						return (v.f0*v.f1)-(v.f2*v.f4);
+					}
+				})
+				.reduceGroup(
+						new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>>() {
+							@Override
+							public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) throws Exception {
+								for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
+									out.collect(v);
+								}
+							}
+						})
+				// add forward field information
+				.withForwardedFields("0")
+				// group again and reduce
+				.groupBy(0).reduceGroup(
+						new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
+							@Override
+							public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
+								int k = 0;
+								long s = 0;
+								for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
+									k = v.f0;
+									s += v.f1;
+								}
+								out.collect(new Tuple2<Integer, Long>(k, s));
+							}
+						}
+				);
+
+		reduceDs.writeAsCsv(resultPath);
+
+		env.execute();
+
+		expected = "1,1\n" +
+				"2,5\n" +
+				"3,15\n" +
+				"4,34\n" +
+				"5,65\n";
+
+	}
+
 	public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
 		@Override
 		public void reduce(


[4/5] flink git commit: [FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors

Posted by fh...@apache.org.
[FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors

This closes #532


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f39aec82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f39aec82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f39aec82

Branch: refs/heads/master
Commit: f39aec82d6cd1286f129b11366d101cb646716ee
Parents: dda8565
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Mar 24 15:18:21 2015 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 3 20:41:40 2015 +0200

----------------------------------------------------------------------
 .../api/java/functions/SemanticPropUtil.java    |   79 ++
 .../api/java/operators/CoGroupOperator.java     |   25 +
 .../java/operators/GroupCombineOperator.java    |   23 +
 .../api/java/operators/GroupReduceOperator.java |   22 +
 .../flink/api/java/operators/JoinOperator.java  |   23 +
 .../api/java/operators/ReduceOperator.java      |   22 +
 .../translation/TwoKeyExtractingMapper.java     |    3 +-
 .../java/functions/SemanticPropUtilTest.java    | 1008 ++++++++++--------
 .../api/java/operator/CoGroupOperatorTest.java  |  132 +++
 .../java/operator/GroupCombineOperatorTest.java |  345 ++++++
 .../java/operator/GroupReduceOperatorTest.java  |  345 ++++++
 .../api/java/operator/JoinOperatorTest.java     |  183 +++-
 .../api/java/operator/ReduceOperatorTest.java   |  238 +++++
 .../javaApiOperators/GroupReduceITCase.java     |   58 +
 14 files changed, 2054 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 3343671..4569be3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -153,6 +153,85 @@ public class SemanticPropUtil {
 		return dsp;
 	}
 
+	/**
+	 * Creates SemanticProperties by adding an offset to each input field index of the given SemanticProperties.
+	 *
+	 * @param props The SemanticProperties to which the offset is added.
+	 * @param numInputFields The original number of fields of the input.
+	 * @param offset The offset that is added to each input field index.
+	 * @return New SemanticProperties with added offset.
+	 */
+	public static SingleInputSemanticProperties addSourceFieldOffset(SingleInputSemanticProperties props,
+																		int numInputFields, int offset) {
+
+		SingleInputSemanticProperties offsetProps = new SingleInputSemanticProperties();
+		if (props.getReadFields(0) != null) {
+			FieldSet offsetReadFields = new FieldSet();
+			for (int r : props.getReadFields(0)) {
+				offsetReadFields = offsetReadFields.addField(r + offset);
+			}
+			offsetProps.addReadFields(offsetReadFields);
+		}
+		for (int s = 0; s < numInputFields; s++) {
+			FieldSet targetFields = props.getForwardingTargetFields(0, s);
+			for (int t : targetFields) {
+				offsetProps.addForwardedField(s + offset, t);
+			}
+		}
+		return offsetProps;
+	}
+
+	/**
+	 * Creates SemanticProperties by adding offsets to each input field index of the given SemanticProperties.
+	 *
+	 * @param props The SemanticProperties to which the offset is added.
+	 * @param numInputFields1 The original number of fields of the first input.
+	 * @param numInputFields2 The original number of fields of the second input.
+	 * @param offset1 The offset that is added to each input field index of the first input.
+	 * @param offset2 The offset that is added to each input field index of the second input.
+	 * @return New SemanticProperties with added offsets.
+	 */
+	public static DualInputSemanticProperties addSourceFieldOffsets(DualInputSemanticProperties props,
+																	int numInputFields1, int numInputFields2,
+																	int offset1, int offset2) {
+
+		DualInputSemanticProperties offsetProps = new DualInputSemanticProperties();
+
+		// add offset to read fields on first input
+		if(props.getReadFields(0) != null) {
+			FieldSet offsetReadFields = new FieldSet();
+			for(int r : props.getReadFields(0)) {
+				offsetReadFields = offsetReadFields.addField(r+offset1);
+			}
+			offsetProps.addReadFields(0, offsetReadFields);
+		}
+		// add offset to read fields on second input
+		if(props.getReadFields(1) != null) {
+			FieldSet offsetReadFields = new FieldSet();
+			for(int r : props.getReadFields(1)) {
+				offsetReadFields = offsetReadFields.addField(r+offset2);
+			}
+			offsetProps.addReadFields(1, offsetReadFields);
+		}
+
+		// add offset to forward fields on first input
+		for(int s = 0; s < numInputFields1; s++) {
+			FieldSet targetFields = props.getForwardingTargetFields(0, s);
+			for(int t : targetFields) {
+				offsetProps.addForwardedField(0, s + offset1, t);
+			}
+		}
+		// add offset to forward fields on second input
+		for(int s = 0; s < numInputFields2; s++) {
+			FieldSet targetFields = props.getForwardingTargetFields(1, s);
+			for(int t : targetFields) {
+				offsetProps.addForwardedField(1, s + offset2, t);
+			}
+		}
+
+		return offsetProps;
+	}
+
 	public static SingleInputSemanticProperties getSemanticPropsSingle(
 			Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
 		if (set == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index a051eb0..115a238 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -40,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -129,6 +131,29 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		return function;
 	}
 
+	@Override
+	public DualInputSemanticProperties getSemanticProperties() {
+
+		DualInputSemanticProperties props = super.getSemanticProperties();
+
+		// offset semantic information by extracted key fields
+		if(props != null &&
+					(this.keys1 instanceof Keys.SelectorFunctionKeys ||
+					this.keys2 instanceof Keys.SelectorFunctionKeys)) {
+
+			int numFields1 = this.getInput1Type().getTotalFields();
+			int numFields2 = this.getInput2Type().getTotalFields();
+			int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
+					((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+			int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
+					((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+
+			props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
+		}
+
+		return props;
+	}
+
 	protected Keys<I1> getKeys1() {
 		return this.keys1;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 911c608..dc26fec 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -23,11 +23,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
@@ -86,6 +88,27 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		return function;
 	}
 
+	@Override
+	public SingleInputSemanticProperties getSemanticProperties() {
+
+		SingleInputSemanticProperties props = super.getSemanticProperties();
+
+		// offset semantic information by extracted key fields
+		if(props != null &&
+				this.grouper != null &&
+				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			if(this.grouper instanceof SortedGrouping) {
+				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+			}
+
+			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+		}
+
+		return props;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index c96b7c6..bc4413f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
@@ -120,6 +122,26 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		return this;
 	}
 
+	@Override
+	public SingleInputSemanticProperties getSemanticProperties() {
+
+		SingleInputSemanticProperties props = super.getSemanticProperties();
+
+		// offset semantic information by extracted key fields
+		if(props != null &&
+				this.grouper != null &&
+				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			if(this.grouper instanceof SortedGrouping) {
+				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+			}
+			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+		}
+
+		return props;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index e450ae1..4adf6b3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -225,6 +225,29 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		@Override
+		public DualInputSemanticProperties getSemanticProperties() {
+
+			DualInputSemanticProperties props = super.getSemanticProperties();
+
+			// offset semantic information by extracted key fields
+			if(props != null &&
+					(this.keys1 instanceof Keys.SelectorFunctionKeys ||
+							this.keys2 instanceof Keys.SelectorFunctionKeys)) {
+
+				int numFields1 = this.getInput1Type().getTotalFields();
+				int numFields2 = this.getInput2Type().getTotalFields();
+				int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
+						((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+				int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
+						((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+
+				props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
+			}
+
+			return props;
+		}
+
+		@Override
 		protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
 			if (function instanceof DefaultJoin.WrappingFlatJoinFunction) {
 				return super.extractSemanticAnnotationsFromUdf(((WrappingFunction<?>) function).getWrappedFunction().getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 5951df8..e770278 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -21,10 +21,12 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
@@ -78,6 +80,26 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	}
 
 	@Override
+	public SingleInputSemanticProperties getSemanticProperties() {
+
+		SingleInputSemanticProperties props = super.getSemanticProperties();
+
+		// offset semantic information by extracted key fields
+		if(props != null &&
+				this.grouper != null &&
+				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			if(this.grouper instanceof SortedGrouping) {
+				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+			}
+			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+		}
+
+		return props;
+	}
+
+	@Override
 	protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
 		
 		String name = getName() != null ? getName() : "Reduce at "+defaultName;

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
index 48d2d1a..a98c899 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 
-
+@ForwardedFields("*->2")
 public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T, Tuple3<K1, K2, T>> {
 
 	private static final long serialVersionUID = 1L;