You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 10:07:54 UTC

[2/4] flink git commit: [FLINK-2668] [DataSet] [api-breaking] Chained Projections are no longer appended

[FLINK-2668] [DataSet] [api-breaking] Chained Projections are no longer appended

This closes #1279


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32b0dfd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32b0dfd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32b0dfd1

Branch: refs/heads/master
Commit: 32b0dfd102c4264a26ad9b5f6bb5779f8a8d33da
Parents: 4db2706
Author: zentol <s....@web.de>
Authored: Wed Oct 21 15:27:05 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 22 20:51:25 2015 +0200

----------------------------------------------------------------------
 .../api/java/operators/ProjectOperator.java     | 104 +++++--------------
 .../flink/api/java/tuple/TupleGenerator.java    |   2 +-
 .../SemanticPropertiesProjectionTest.java       |   4 +-
 .../api/python/flink/test/test_main.py          |   2 +-
 4 files changed, 29 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 55e182f..de79383 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -47,21 +47,11 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	extends SingleInputOperator<IN, OUT, ProjectOperator<IN, OUT>> {
 	
 	protected final int[] fields;
-	
-	private Projection<IN> proj;
 
 	public ProjectOperator(DataSet<IN> input, int[] fields, TupleTypeInfo<OUT> returnType) {
 		super(input, returnType);
 	
 		this.fields = fields;
-		proj = null;
-	}
-	
-	public ProjectOperator(DataSet<IN> input, int[] fields, TupleTypeInfo<OUT> returnType, Projection<IN> proj) {
-		super(input, returnType);
-	
-		this.fields = fields;
-		this.proj = proj;
 	}
 
 	@Override
@@ -77,28 +67,6 @@ public class ProjectOperator<IN, OUT extends Tuple>
 
 		return ppo;
 	}
-
-	/**
-	 * Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
-	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
-	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
-	 * Additional fields can be added to the projection by calling this method repeatedly.
-	 *
-	 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
-	 *
-	 * @param fieldIndexes The field indexes which are added to the Project transformation.
-	 * 					   The order of fields in the output tuple corresponds to the order of field indexes.
-	 * @return A ProjectOperator that represents the projected DataSet.
-	 *
-	 * @see Tuple
-	 * @see DataSet
-	 * @see ProjectOperator
-	 */
-	public <R extends Tuple> ProjectOperator<?, R> project(int... fieldIndexes) {
-		proj.acceptAdditionalIndexes(fieldIndexes);
-		
-		return proj.projectTupleX();
-	}
 	/**
 	 * Deprecated method only kept for compatibility.
 	 */
@@ -146,28 +114,6 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			this.fieldIndexes = fieldIndexes;
 		}
 		
-		private void acceptAdditionalIndexes(int... additionalIndexes) {
-			
-			if(additionalIndexes.length == 0) {
-				throw new IllegalArgumentException("project() needs to select at least one (1) field.");
-			} else if(additionalIndexes.length > Tuple.MAX_ARITY - 1) {
-				throw new IllegalArgumentException(
-						"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
-			}
-			
-			int offset = this.fieldIndexes.length;
-			
-			this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length);
-			
-			int maxFieldIndex = ds.getType().getArity();
-			for (int i = 0; i < additionalIndexes.length; i++) {
-				Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex);
-
-				this.fieldIndexes[offset + i] = additionalIndexes[i];
-			}
-		}
-		
-		
 		
 		// --------------------------------------------------------------------------------------------	
 		// The following lines are generated.
@@ -231,7 +177,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
 
-			return new ProjectOperator<T, Tuple1<T0>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple1<T0>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -246,7 +192,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
 
-			return new ProjectOperator<T, Tuple2<T0, T1>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple2<T0, T1>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -261,7 +207,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
 
-			return new ProjectOperator<T, Tuple3<T0, T1, T2>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple3<T0, T1, T2>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -276,7 +222,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
 
-			return new ProjectOperator<T, Tuple4<T0, T1, T2, T3>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple4<T0, T1, T2, T3>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -291,7 +237,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
 
-			return new ProjectOperator<T, Tuple5<T0, T1, T2, T3, T4>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple5<T0, T1, T2, T3, T4>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -306,7 +252,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
 
-			return new ProjectOperator<T, Tuple6<T0, T1, T2, T3, T4, T5>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple6<T0, T1, T2, T3, T4, T5>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -321,7 +267,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
 
-			return new ProjectOperator<T, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -336,7 +282,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
 
-			return new ProjectOperator<T, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -351,7 +297,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
 
-			return new ProjectOperator<T, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -366,7 +312,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
 
-			return new ProjectOperator<T, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -381,7 +327,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
 
-			return new ProjectOperator<T, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -396,7 +342,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
 
-			return new ProjectOperator<T, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -411,7 +357,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
 
-			return new ProjectOperator<T, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -426,7 +372,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
 
-			return new ProjectOperator<T, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -441,7 +387,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
 
-			return new ProjectOperator<T, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -456,7 +402,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
 
-			return new ProjectOperator<T, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -471,7 +417,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
 
-			return new ProjectOperator<T, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -486,7 +432,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
 
-			return new ProjectOperator<T, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -501,7 +447,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
 
-			return new ProjectOperator<T, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -516,7 +462,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
 
-			return new ProjectOperator<T, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -531,7 +477,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
 
-			return new ProjectOperator<T, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -546,7 +492,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
 
-			return new ProjectOperator<T, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -561,7 +507,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
 
-			return new ProjectOperator<T, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -576,7 +522,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
 
-			return new ProjectOperator<T, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -591,7 +537,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
 
-			return new ProjectOperator<T, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		// END_OF_TUPLE_DEPENDENT_CODE

http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index f306fe0..66ee25f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -304,7 +304,7 @@ class TupleGenerator {
 			// create and return new project operator
 			sb.append("\t\t\treturn new ProjectOperator<T, Tuple"+numFields+"<");
 			appendTupleTypeGenerics(sb, numFields);
-			sb.append(">>(this.ds, this.fieldIndexes, tType, this);\n");
+			sb.append(">>(this.ds, this.fieldIndexes, tType);\n");
 
 			// method end
 			sb.append("\t\t}\n");

http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index 916086b..0368ca3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -71,7 +71,7 @@ public class SemanticPropertiesProjectionTest {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		tupleDs.project(1, 3, 2).project(0, 3).output(new DiscardingOutputFormat<Tuple>());
+		tupleDs.project(1, 3, 2, 0, 3).output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 
@@ -97,7 +97,7 @@ public class SemanticPropertiesProjectionTest {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo);
 
-		tupleDs.project(2, 3, 1).project(2).output(new DiscardingOutputFormat<Tuple>());
+		tupleDs.project(2, 3, 1, 2).output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
index 29113f3..2116d1f 100644
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
+++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
@@ -232,7 +232,7 @@ if __name__ == "__main__":
         .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output()
 
     d2 \
-        .project(0, 1).project(2) \
+        .project(0, 1, 2) \
         .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output()
 
     d2 \