You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 15:45:08 UTC

[1/3] flink git commit: [hotfix] Fix Mutable Object window aggregator/Disable Object Copy

Repository: flink
Updated Branches:
  refs/heads/master bbb75c599 -> c36977f76


[hotfix] Fix Mutable Object window aggregator/Disable Object Copy

This fixes the aggregators to make copies of the objects so that it
works with window operators that are not mutable-object safe.

This also disables object copy in WindowOperator and
NonKeyedWindowOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c36977f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c36977f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c36977f7

Branch: refs/heads/master
Commit: c36977f7676a5e9c0a1a8f45815d9517e56d38ae
Parents: 256a88a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 13:08:35 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 15:37:55 2015 +0200

----------------------------------------------------------------------
 .../aggregation/AggregationFunction.java        |  8 +--
 .../aggregation/ComparableAggregator.java       | 51 ++++++++++----------
 .../functions/aggregation/SumAggregator.java    | 35 +++++++++++---
 .../functions/windowing/FoldWindowFunction.java | 12 ++---
 .../windowing/NonKeyedWindowOperator.java       |  5 +-
 .../operators/windowing/WindowOperator.java     |  5 +-
 6 files changed, 66 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c36977f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
index 23cca90..ed39103 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
@@ -22,13 +22,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 public abstract class AggregationFunction<T> extends RichReduceFunction<T> {
 	private static final long serialVersionUID = 1L;
 
-	public int position;
-
-	public AggregationFunction(int pos) {
-		this.position = pos;
-	}
-
-	public static enum AggregationType {
+	public enum AggregationType {
 		SUM, MIN, MAX, MINBY, MAXBY,
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c36977f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index e5501a0..e70e30a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -25,35 +25,39 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	public Comparator comparator;
-	public boolean byAggregate;
-	public boolean first;
-	FieldAccessor<T, Object> fieldAccessor;
+	private Comparator comparator;
+	private boolean byAggregate;
+	private boolean first;
+	private final FieldAccessor<T, Object> fieldAccessor;
 	
-	private ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
-		super(pos);
+	private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
 		this.comparator = Comparator.getForAggregation(aggregationType);
 		this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
 		this.first = first;
+		this.fieldAccessor = fieldAccessor;
 	}
 
-	public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType
-			, ExecutionConfig config) {
+	public ComparableAggregator(int positionToAggregate,
+			TypeInformation<T> typeInfo,
+			AggregationType aggregationType,
+			ExecutionConfig config) {
 		this(positionToAggregate, typeInfo, aggregationType, false, config);
 	}
 
-	public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType,
-								boolean first, ExecutionConfig config) {
-		this(positionToAggregate, aggregationType, first);
-		this.fieldAccessor = FieldAccessor.create(positionToAggregate, typeInfo, config);
-		this.first = first;
+	public ComparableAggregator(int positionToAggregate,
+			TypeInformation<T> typeInfo,
+			AggregationType aggregationType,
+			boolean first,
+			ExecutionConfig config) {
+		this(aggregationType, FieldAccessor.create(positionToAggregate, typeInfo, config), first);
 	}
 
 	public ComparableAggregator(String field,
-			TypeInformation<T> typeInfo, AggregationType aggregationType, boolean first, ExecutionConfig config) {
-		this(0, aggregationType, first);
-		this.fieldAccessor = FieldAccessor.create(field, typeInfo, config);
-		this.first = first;
+			TypeInformation<T> typeInfo,
+			AggregationType aggregationType,
+			boolean first,
+			ExecutionConfig config) {
+		this(aggregationType, FieldAccessor.create(field, typeInfo, config), first);
 	}
 
 
@@ -66,16 +70,13 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
 		int c = comparator.isExtremal(o1, o2);
 
 		if (byAggregate) {
-			if (c == 1) {
-				return value1;
-			}
-			if (first) {
-				if (c == 0) {
-					return value1;
-				}
+			// if they are the same we choose based on whether we want to first or last
+			// element with the min/max.
+			if (c == 0) {
+				return first ? value1 : value2;
 			}
 
-			return value2;
+			return c == 1 ? value1 : value2;
 
 		} else {
 			if (c == 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c36977f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index b045233..8c9cf7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -19,30 +19,53 @@ package org.apache.flink.streaming.api.functions.aggregation;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.util.FieldAccessor;
 
 public class SumAggregator<T> extends AggregationFunction<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	FieldAccessor<T, Object> fieldAccessor;
-	SumFunction adder;
+	private final FieldAccessor<T, Object> fieldAccessor;
+	private final SumFunction adder;
+	private final TypeSerializer<T> serializer;
+	private final boolean isTuple;
 
 	public SumAggregator(int pos, TypeInformation<T> typeInfo, ExecutionConfig config) {
-		super(pos);
 		fieldAccessor = FieldAccessor.create(pos, typeInfo, config);
 		adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
+		if (typeInfo instanceof TupleTypeInfo) {
+			isTuple = true;
+			serializer = null;
+		} else {
+			isTuple = false;
+			this.serializer = typeInfo.createSerializer(config);
+		}
 	}
 
 	public SumAggregator(String field, TypeInformation<T> typeInfo, ExecutionConfig config) {
-		super(0);
 		fieldAccessor = FieldAccessor.create(field, typeInfo, config);
 		adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
+		if (typeInfo instanceof TupleTypeInfo) {
+			isTuple = true;
+			serializer = null;
+		} else {
+			isTuple = false;
+			this.serializer = typeInfo.createSerializer(config);
+		}
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
+	@SuppressWarnings("unchecked")
 	public T reduce(T value1, T value2) throws Exception {
-		return fieldAccessor.set(value1, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+		if (isTuple) {
+			Tuple result = ((Tuple)value1).copy();
+			return fieldAccessor.set((T) result, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+		} else {
+			T result = serializer.copy(value1);
+			return fieldAccessor.set(result, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c36977f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
index 04d2ac7..1d29e36 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
@@ -34,6 +33,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 
 public class FoldWindowFunction<K, W extends Window, T, R>
 		extends WrappingFunction<FoldFunction<T, R>>
@@ -49,9 +49,8 @@ public class FoldWindowFunction<K, W extends Window, T, R>
 		this.initialValue = initialValue;
 	}
 
-	@Override
-	public void open(Configuration configuration) throws Exception {
-		super.open(configuration);
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
 
 		if (serializedInitialValue == null) {
 			throw new RuntimeException("No initial value was serialized for the fold " +
@@ -59,10 +58,11 @@ public class FoldWindowFunction<K, W extends Window, T, R>
 		}
 
 		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
-		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
+		InputViewDataInputStreamWrapper inStream = new InputViewDataInputStreamWrapper(
 				new DataInputStream(bais)
 		);
-		initialValue = outSerializer.deserialize(in);
+
+		initialValue = outSerializer.deserialize(inStream);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c36977f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index a002b23..03e8c4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -231,9 +231,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 				context = new Context(window, windowBuffer);
 				windows.put(window, context);
 			}
-			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			context.windowBuffer.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
+			context.windowBuffer.storeElement(element);
+			Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
 			processTriggerResult(triggerResult, window);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c36977f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index a80f971..30ce477 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -276,9 +276,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 				context = new Context(key, window, windowBuffer);
 				keyWindows.put(window, context);
 			}
-			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			context.windowBuffer.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
+			context.windowBuffer.storeElement(element);
+			Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
 			processTriggerResult(triggerResult, key, window);
 		}
 	}


[2/3] flink git commit: [hotfix] Fix broken copy in OperatorChain

Posted by al...@apache.org.
[hotfix] Fix broken copy in OperatorChain

Before, the StreamRecords was not copied, now it is.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/256a88a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/256a88a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/256a88a1

Branch: refs/heads/master
Commit: 256a88a186dd7fe2a958d4bbc826fee3b806efc6
Parents: d056f11
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 14:38:16 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 15:37:55 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/tasks/OperatorChain.java   | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/256a88a1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b42b888..ac27093 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -301,22 +301,19 @@ public class OperatorChain<OUT> {
 		
 		private final TypeSerializer<T> serializer;
 		
-		private final StreamRecord<T> copyRecord;
-
 		public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
 			super(operator);
 			this.serializer = serializer;
-			this.copyRecord = new StreamRecord<T>(null, 0L);
 		}
 
 		@Override
 		public void collect(StreamRecord<T> record) {
 			try {
-				T copy = serializer.copy(record.getValue());
-				copyRecord.replace(copy, record.getTimestamp());
-				
-				operator.setKeyContextElement(copyRecord);
-				operator.processElement(copyRecord);
+
+				StreamRecord<T> copy = new StreamRecord<>(serializer.copy(record.getValue()), record.getTimestamp());
+
+				operator.setKeyContextElement(copy);
+				operator.processElement(copy);
 			}
 			catch (Exception e) {
 				throw new RuntimeException("Could not forward element to next operator", e);


[3/3] flink git commit: Add copy() to Tuple base class.

Posted by al...@apache.org.
Add copy() to Tuple base class.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d056f11d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d056f11d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d056f11d

Branch: refs/heads/master
Commit: d056f11ddf42a944b113a4ad46874a5b4d237653
Parents: bbb75c5
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 12:48:02 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 15:37:55 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/api/java/tuple/Tuple.java  |  8 +++++++-
 .../main/java/org/apache/flink/api/java/tuple/Tuple0.java | 10 ++++++++++
 .../main/java/org/apache/flink/api/java/tuple/Tuple1.java |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple10.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple11.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple12.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple13.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple14.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple15.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple16.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple17.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple18.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple19.java     |  2 ++
 .../main/java/org/apache/flink/api/java/tuple/Tuple2.java |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple20.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple21.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple22.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple23.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple24.java     |  2 ++
 .../java/org/apache/flink/api/java/tuple/Tuple25.java     |  2 ++
 .../main/java/org/apache/flink/api/java/tuple/Tuple3.java |  2 ++
 .../main/java/org/apache/flink/api/java/tuple/Tuple4.java |  2 ++
 .../main/java/org/apache/flink/api/java/tuple/Tuple5.java |  2 ++
 .../main/java/org/apache/flink/api/java/tuple/Tuple6.java |  2 ++
 .../main/java/org/apache/flink/api/java/tuple/Tuple7.java |  2 ++
 .../main/java/org/apache/flink/api/java/tuple/Tuple8.java |  2 ++
 .../main/java/org/apache/flink/api/java/tuple/Tuple9.java |  2 ++
 .../org/apache/flink/api/java/tuple/TupleGenerator.java   |  2 ++
 28 files changed, 69 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index 3b07aed..71b1f61 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -81,7 +81,13 @@ public abstract class Tuple implements java.io.Serializable {
 	 * @return The number of fields in the tuple.
 	 */
 	public abstract int getArity();
-	
+
+	/**
+	 * Shallow tuple copy.
+	 * @return A new Tuple with the same fields as this.
+	 */
+	public abstract <T extends Tuple> T copy();
+
 	// --------------------------------------------------------------------------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java
index 1caee2a..c3386dc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java
@@ -50,6 +50,16 @@ public class Tuple0 extends Tuple {
 		throw new IndexOutOfBoundsException(String.valueOf(pos));
 	}
 
+	/**
+	 * Shallow tuple copy.
+	 * @return A new Tuple with the same fields as this.
+	 */
+	@Override
+	@SuppressWarnings("unchecked")
+	public Tuple0 copy(){
+		return new Tuple0();
+	}
+
 	// -------------------------------------------------------------------------------------------------
 	// standard utilities
 	// -------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple1.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple1.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple1.java
index 8e0e3dd..01c5501 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple1.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple1.java
@@ -134,6 +134,8 @@ public class Tuple1<T0> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple1<T0> copy(){ 
 		return new Tuple1<T0>(this.f0);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple10.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple10.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple10.java
index 7dd0e62..ea24f40 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple10.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple10.java
@@ -260,6 +260,8 @@ public class Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple10<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9> copy(){ 
 		return new Tuple10<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple11.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple11.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple11.java
index f1eabe3..b58ae6c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple11.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple11.java
@@ -274,6 +274,8 @@ public class Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> extends Tuple
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple11<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10> copy(){ 
 		return new Tuple11<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple12.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple12.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple12.java
index 304ef1e..4e7defb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple12.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple12.java
@@ -288,6 +288,8 @@ public class Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> extends T
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple12<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11> copy(){ 
 		return new Tuple12<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple13.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple13.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple13.java
index a546917..9543912 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple13.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple13.java
@@ -302,6 +302,8 @@ public class Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> exte
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple13<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12> copy(){ 
 		return new Tuple13<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple14.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple14.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple14.java
index 5b0c8e4..36d508e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple14.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple14.java
@@ -316,6 +316,8 @@ public class Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple14<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13> copy(){ 
 		return new Tuple14<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java
index cd9230a..0d45352 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java
@@ -330,6 +330,8 @@ public class Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple15<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14> copy(){ 
 		return new Tuple15<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple16.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple16.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple16.java
index 4f5c264..5c57ee7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple16.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple16.java
@@ -344,6 +344,8 @@ public class Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple16<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15> copy(){ 
 		return new Tuple16<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java
index 826ad0c..70da5bb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java
@@ -358,6 +358,8 @@ public class Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple17<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16> copy(){ 
 		return new Tuple17<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple18.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple18.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple18.java
index a81bff8..c221ee6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple18.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple18.java
@@ -372,6 +372,8 @@ public class Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple18<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17> copy(){ 
 		return new Tuple18<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple19.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple19.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple19.java
index 6fed55c..3d4d262 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple19.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple19.java
@@ -386,6 +386,8 @@ public class Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple19<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18> copy(){ 
 		return new Tuple19<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple2.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple2.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple2.java
index 51b3641..4ec930c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple2.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple2.java
@@ -156,6 +156,8 @@ public class Tuple2<T0, T1> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple2<T0,T1> copy(){ 
 		return new Tuple2<T0,T1>(this.f0,
 			this.f1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java
index c743cb7..8116121 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java
@@ -400,6 +400,8 @@ public class Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple20<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19> copy(){ 
 		return new Tuple20<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple21.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple21.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple21.java
index 5749a1e..6a1aeab 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple21.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple21.java
@@ -414,6 +414,8 @@ public class Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple21<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20> copy(){ 
 		return new Tuple21<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java
index 7a60808..5f7194b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java
@@ -428,6 +428,8 @@ public class Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple22<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20,T21> copy(){ 
 		return new Tuple22<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20,T21>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple23.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple23.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple23.java
index f0002fd..35c71ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple23.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple23.java
@@ -442,6 +442,8 @@ public class Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple23<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20,T21,T22> copy(){ 
 		return new Tuple23<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20,T21,T22>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple24.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple24.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple24.java
index 874c090..5c7f91f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple24.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple24.java
@@ -456,6 +456,8 @@ public class Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple24<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20,T21,T22,T23> copy(){ 
 		return new Tuple24<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20,T21,T22,T23>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java
index 1b86c0b..901b838 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java
@@ -470,6 +470,8 @@ public class Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple25<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20,T21,T22,T23,T24> copy(){ 
 		return new Tuple25<T0,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15,T16,T17,T18,T19,T20,T21,T22,T23,T24>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple3.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple3.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple3.java
index 36824ac..3a46222 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple3.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple3.java
@@ -162,6 +162,8 @@ public class Tuple3<T0, T1, T2> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple3<T0,T1,T2> copy(){ 
 		return new Tuple3<T0,T1,T2>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple4.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple4.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple4.java
index 4e24b1e..0556ad4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple4.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple4.java
@@ -176,6 +176,8 @@ public class Tuple4<T0, T1, T2, T3> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple4<T0,T1,T2,T3> copy(){ 
 		return new Tuple4<T0,T1,T2,T3>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple5.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple5.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple5.java
index c253e7e..15d3ea9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple5.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple5.java
@@ -190,6 +190,8 @@ public class Tuple5<T0, T1, T2, T3, T4> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple5<T0,T1,T2,T3,T4> copy(){ 
 		return new Tuple5<T0,T1,T2,T3,T4>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple6.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple6.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple6.java
index 079db12..1d45a26 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple6.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple6.java
@@ -204,6 +204,8 @@ public class Tuple6<T0, T1, T2, T3, T4, T5> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple6<T0,T1,T2,T3,T4,T5> copy(){ 
 		return new Tuple6<T0,T1,T2,T3,T4,T5>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple7.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple7.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple7.java
index b77535c..2008117 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple7.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple7.java
@@ -218,6 +218,8 @@ public class Tuple7<T0, T1, T2, T3, T4, T5, T6> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple7<T0,T1,T2,T3,T4,T5,T6> copy(){ 
 		return new Tuple7<T0,T1,T2,T3,T4,T5,T6>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple8.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple8.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple8.java
index 12a6e9d..7aaa265 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple8.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple8.java
@@ -232,6 +232,8 @@ public class Tuple8<T0, T1, T2, T3, T4, T5, T6, T7> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple8<T0,T1,T2,T3,T4,T5,T6,T7> copy(){ 
 		return new Tuple8<T0,T1,T2,T3,T4,T5,T6,T7>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple9.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple9.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple9.java
index 198360d..86ea998 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple9.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple9.java
@@ -246,6 +246,8 @@ public class Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8> extends Tuple {
 	* Shallow tuple copy.
 	* @return A new Tuple with the same fields as this.
 	*/
+	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple9<T0,T1,T2,T3,T4,T5,T6,T7,T8> copy(){ 
 		return new Tuple9<T0,T1,T2,T3,T4,T5,T6,T7,T8>(this.f0,
 			this.f1,

http://git-wip-us.apache.org/repos/asf/flink/blob/d056f11d/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index 66ee25f..a2d37ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -727,6 +727,8 @@ class TupleGenerator {
 		w.println("\t* Shallow tuple copy.");
 		w.println("\t* @return A new Tuple with the same fields as this.");
 		w.println("\t*/");
+		w.println("\t@Override");
+		w.println("\t@SuppressWarnings(\"unchecked\")");
 		w.println("\tpublic " + className + tupleTypes + " copy(){ ");
 
 		w.print("\t\treturn new " + className + tupleTypes + "(this.f0");