You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/14 23:26:30 UTC

[1/4] incubator-flink git commit: [FLINK-1322] Make Scala API respect WriteMode set in Config

Repository: incubator-flink
Updated Branches:
  refs/heads/master 4cc6bb1db -> 13968cd4d


[FLINK-1322] Make Scala API respect WriteMode set in Config

This closes #266


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0028238b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0028238b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0028238b

Branch: refs/heads/master
Commit: 0028238b31e36bb13f0642672d3a493f428f90eb
Parents: 4cc6bb1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Dec 12 15:48:19 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Dec 14 16:00:35 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/DataSet.scala      | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0028238b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 2edf65e..13d1f08 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1143,9 +1143,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    */
   def writeAsText(
       filePath: String,
-      writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+      writeMode: FileSystem.WriteMode = null): DataSink[T] = {
     val tof: TextOutputFormat[T] = new TextOutputFormat[T](new Path(filePath))
-    tof.setWriteMode(writeMode)
+    if (writeMode != null) {
+      tof.setWriteMode(writeMode)
+    }
     output(tof)
   }
 
@@ -1158,10 +1160,12 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
       filePath: String,
       rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
       fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
-      writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+      writeMode: FileSystem.WriteMode = null): DataSink[T] = {
     Validate.isTrue(javaSet.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
     val of = new ScalaCsvOutputFormat[Product](new Path(filePath), rowDelimiter, fieldDelimiter)
-    of.setWriteMode(writeMode)
+    if (writeMode != null) {
+      of.setWriteMode(writeMode)
+    }
     output(of.asInstanceOf[OutputFormat[T]])
   }
 
@@ -1172,11 +1176,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def write(
       outputFormat: FileOutputFormat[T],
       filePath: String,
-      writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+      writeMode: FileSystem.WriteMode = null): DataSink[T] = {
     Validate.notNull(filePath, "File path must not be null.")
     Validate.notNull(outputFormat, "Output format must not be null.")
     outputFormat.setOutputFilePath(new Path(filePath))
-    outputFormat.setWriteMode(writeMode)
+    if (writeMode != null) {
+      outputFormat.setWriteMode(writeMode)
+    }
     output(outputFormat)
   }
 


[3/4] incubator-flink git commit: [misc] Fix/suppress various compiler warnings.

Posted by se...@apache.org.
[misc] Fix/suppress various compiler warnings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/de7f478f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/de7f478f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/de7f478f

Branch: refs/heads/master
Commit: de7f478fc6dc9933ce84e6018a14e457ba883db1
Parents: 9e40366
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Dec 14 16:22:32 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Dec 14 16:22:53 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/addons/hbase/TableInputFormat.java  | 2 +-
 .../apache/flink/spargel/java/record/SpargelIteration.java    | 1 +
 .../CustomPartitioningGlobalOptimizationTest.java             | 7 +------
 .../apache/flink/runtime/operators/hash/HashPartition.java    | 5 ++---
 .../flink/runtime/operators/hash/InMemoryPartition.java       | 2 +-
 .../apache/flink/runtime/operators/hash/MutableHashTable.java | 3 ---
 .../apache/flink/api/scala/operators/ScalaCsvInputFormat.java | 2 +-
 .../apache/flink/test/javaApiOperators/GroupReduceITCase.java | 2 ++
 .../org/apache/flink/test/javaApiOperators/ReduceITCase.java  | 1 +
 .../flink/test/javaApiOperators/util/CollectionDataSets.java  | 6 ++++--
 .../flink/test/recordJobTests/CollectionSourceTest.java       | 2 +-
 .../flink/test/recordJobTests/CollectionValidationTest.java   | 2 +-
 .../apache/flink/test/recordJobTests/GlobalSortingITCase.java | 2 +-
 .../test/recordJobTests/GlobalSortingMixedOrderITCase.java    | 2 +-
 .../flink/test/recordJobTests/GroupOrderReduceITCase.java     | 1 +
 .../flink/test/recordJobTests/WordCountUnionReduceITCase.java | 1 +
 .../flink/test/recordJobs/graph/ComputeEdgeDegrees.java       | 2 +-
 .../test/recordJobs/graph/ConnectedComponentsWithCoGroup.java | 5 +----
 .../apache/flink/test/recordJobs/graph/DanglingPageRank.java  | 2 +-
 .../test/recordJobs/graph/DeltaPageRankWithInitialDeltas.java | 1 +
 .../recordJobs/graph/EnumTrianglesOnEdgesWithDegrees.java     | 3 +--
 .../flink/test/recordJobs/graph/EnumTrianglesRdfFoaf.java     | 2 +-
 .../flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java | 2 +-
 .../org/apache/flink/test/recordJobs/graph/PairwiseSP.java    | 4 +---
 .../apache/flink/test/recordJobs/graph/SimplePageRank.java    | 3 +--
 .../test/recordJobs/graph/WorksetConnectedComponents.java     | 5 +----
 .../test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java | 1 +
 .../test/recordJobs/graph/pageRankUtil/DotProductMatch.java   | 2 +-
 .../org/apache/flink/test/recordJobs/kmeans/KMeansCross.java  | 3 +--
 .../apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java | 4 +---
 .../flink/test/recordJobs/kmeans/udfs/ComputeDistance.java    | 1 +
 .../recordJobs/kmeans/udfs/ComputeDistanceParameterized.java  | 1 +
 .../flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java  | 1 +
 .../test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java   | 1 +
 .../flink/test/recordJobs/relational/MergeOnlyJoin.java       | 2 +-
 .../apache/flink/test/recordJobs/relational/TPCHQuery1.java   | 3 +--
 .../apache/flink/test/recordJobs/relational/TPCHQuery10.java  | 3 +--
 .../apache/flink/test/recordJobs/relational/TPCHQuery3.java   | 2 +-
 .../flink/test/recordJobs/relational/TPCHQuery3Unioned.java   | 2 +-
 .../apache/flink/test/recordJobs/relational/TPCHQuery4.java   | 4 ++--
 .../apache/flink/test/recordJobs/relational/TPCHQuery9.java   | 3 +--
 .../flink/test/recordJobs/relational/TPCHQueryAsterix.java    | 2 +-
 .../flink/test/recordJobs/relational/WebLogAnalysis.java      | 2 +-
 .../recordJobs/relational/query1Util/GroupByReturnFlag.java   | 2 +-
 .../test/recordJobs/relational/query1Util/LineItemFilter.java | 3 +--
 .../recordJobs/relational/query9Util/AmountAggregate.java     | 4 +---
 .../recordJobs/relational/query9Util/FilteredPartsJoin.java   | 4 +---
 .../test/recordJobs/relational/query9Util/LineItemMap.java    | 3 +--
 .../flink/test/recordJobs/relational/query9Util/OrderMap.java | 3 +--
 .../recordJobs/relational/query9Util/OrderedPartsJoin.java    | 3 +--
 .../test/recordJobs/relational/query9Util/PartFilter.java     | 3 +--
 .../flink/test/recordJobs/relational/query9Util/PartJoin.java | 3 +--
 .../test/recordJobs/relational/query9Util/PartListJoin.java   | 4 +---
 .../test/recordJobs/relational/query9Util/PartsuppMap.java    | 3 +--
 .../test/recordJobs/relational/query9Util/SupplierMap.java    | 3 +--
 .../test/recordJobs/relational/query9Util/SuppliersJoin.java  | 4 +---
 .../apache/flink/test/recordJobs/sort/ReduceGroupSort.java    | 2 +-
 .../java/org/apache/flink/test/recordJobs/sort/TeraSort.java  | 2 +-
 .../org/apache/flink/test/recordJobs/wordcount/WordCount.java | 2 +-
 .../test/recordJobs/wordcount/WordCountAccumulators.java      | 2 +-
 60 files changed, 65 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 3045031..9c861ed 100755
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -71,7 +71,7 @@ public abstract class TableInputFormat<T extends Tuple> implements InputFormat<T
 	 * creates a {@link Scan} object and a {@link HTable} connection
 	 *
 	 * @param parameters
-	 * @see {@link Configuration}
+	 * @see Configuration
 	 */
 	@Override
 	public void configure(Configuration parameters) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
index 780bc94..f647e5d 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -35,6 +35,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.ReflectionUtil;
 
+@SuppressWarnings("deprecation")
 public class SpargelIteration {
 	
 	private static final String DEFAULT_NAME = "<unnamed vertex-centric iteration>";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
index f4d4e77..1dcb78e 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -21,19 +21,13 @@ package org.apache.flink.compiler.custompartition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import org.junit.Test;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.JoinOperator;
-import org.apache.flink.api.java.operators.JoinOperator.ProjectJoin;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.compiler.CompilerTestBase;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -41,6 +35,7 @@ import org.apache.flink.compiler.plan.SingleInputPlanNode;
 import org.apache.flink.compiler.plan.SinkPlanNode;
 import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
 
 @SuppressWarnings({"serial", "unchecked"})
 public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 6d3194b..08acd16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -44,8 +44,8 @@ import org.apache.flink.util.MutableObjectIterator;
 
 /**
  * 
- * @param BT The type of the build side records.
- * @param PT The type of the probe side records.
+ * @param <BT> The type of the build side records.
+ * @param <PT> The type of the probe side records.
  */
 public class HashPartition<BT, PT> extends AbstractPagedInputView implements SeekableDataInputView
 {
@@ -117,7 +117,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	 * @param recursionLevel The recursion level - zero for partitions from the initial build, <i>n + 1</i> for
 	 *                       partitions that are created from spilled partition with recursion level <i>n</i>. 
 	 * @param initialBuffer The initial buffer for this partition.
-	 * @param writeBehindBuffers The queue from which to pop buffers for writing, once the partition is spilled.
 	 */
 	HashPartition(TypeSerializer<BT> buildSideAccessors, TypeSerializer<PT> probeSideAccessors,
 			int partitionNumber, int recursionLevel, MemorySegment initialBuffer, MemorySegmentSource memSource,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index e99265b..ca3eb4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
 /**
  * In-memory partition with overflow buckets for {@link CompactingHashTable}
  * 
- * @param T record type
+ * @param <T> record type
  */
 public class InMemoryPartition<T> {
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 1bbf246..67f1ea2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -1129,9 +1129,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 * may free new buffers then.
 	 * 
 	 * @return The next buffer to be used by the hash-table, or null, if no buffer remains.
-	 * @throws IOException Thrown, if the thread is interrupted while grabbing the next buffer. The I/O
-	 *                     exception replaces the <tt>InterruptedException</tt> to consolidate the exception
-	 *                     signatures.
 	 */
 	final MemorySegment getNextBuffer() {
 		// check if the list directly offers memory

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index 97cbd5c..2ee1009 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -71,7 +71,7 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
 		TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
 		serializer = (TupleSerializerBase<OUT>)tupleType.createSerializer();
 
-		Class[] classes = new Class[tupleType.getArity()];
+		Class<?>[] classes = new Class[tupleType.getArity()];
 		for (int i = 0; i < tupleType.getArity(); i++) {
 			classes[i] = tupleType.getTypeAt(i).getTypeClass();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 4a36fb8..60a0d89 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -94,6 +94,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		return toParameterList(tConfigs);
 	}
 	
+	@SuppressWarnings("unused")
 	private static class GroupReduceProgs {
 		
 		public static String runProgram(int progId, String resultPath, boolean collectionExecution) throws Exception {
@@ -555,6 +556,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 					DataSet<Integer> reduceDs = ds.groupBy("hadoopFan", "theTuple.*") // full tuple selection
 							.reduceGroup(new GroupReduceFunction<PojoContainingTupleAndWritable, Integer>() {
 								private static final long serialVersionUID = 1L;
+								
 								@Override
 								public void reduce(Iterable<PojoContainingTupleAndWritable> values,
 										Collector<Integer> out)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 20cbb4a..1fcacb9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -45,6 +45,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+@SuppressWarnings("serial")
 @RunWith(Parameterized.class)
 public class ReduceITCase extends JavaProgramTestBase {
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 731f036..1f812d9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -422,8 +422,10 @@ public class CollectionDataSets {
 	}
 
 	public static class FromTupleWithCTor extends FromTuple {
-		public FromTupleWithCTor() {
-		}
+
+		private static final long serialVersionUID = 1L;
+
+		public FromTupleWithCTor() {}
 
 		public FromTupleWithCTor(int special, long tupleField) {
 			this.special = special;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionSourceTest.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionSourceTest.java
index 55ffeae..16d2517 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionSourceTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionSourceTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobTests;
 
 import java.io.Serializable;
@@ -40,6 +39,7 @@ import org.apache.flink.util.Collector;
 /**
  * test the collection and iterator data input using join operator
  */
+@SuppressWarnings("deprecation")
 public class CollectionSourceTest extends RecordAPITestBase {
 
 	private static final int DOP = 4;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionValidationTest.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionValidationTest.java
index 1073605..1ce7a73 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionValidationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/CollectionValidationTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobTests;
 
 import org.apache.flink.api.java.record.operators.CollectionDataSource;
@@ -29,6 +28,7 @@ import java.util.List;
 /**
  * Test the input field validation of CollectionDataSource
  */
+@SuppressWarnings("deprecation")
 public class CollectionValidationTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingITCase.java
index 73aceed..f23b129 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobTests;
 
 import java.util.ArrayList;
@@ -35,6 +34,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.test.util.RecordAPITestBase;
 import org.apache.flink.types.IntValue;
 
+@SuppressWarnings("deprecation")
 public class GlobalSortingITCase extends RecordAPITestBase {
 	
 	private static final int NUM_RECORDS = 100000;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingMixedOrderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingMixedOrderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingMixedOrderITCase.java
index dfed309..b6b8b9d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingMixedOrderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GlobalSortingMixedOrderITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobTests;
 
 import java.io.IOException;
@@ -39,6 +38,7 @@ import org.apache.flink.test.util.RecordAPITestBase;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 
+@SuppressWarnings("deprecation")
 public class GlobalSortingMixedOrderITCase extends RecordAPITestBase {
 	
 	private static final int NUM_RECORDS = 100000;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
index 1f2d1df..368f9af 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
@@ -41,6 +41,7 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
+@SuppressWarnings("deprecation")
 public class GroupOrderReduceITCase extends RecordAPITestBase {
 
 	private static final String INPUT = "1,3\n" + "2,1\n" + "5,1\n" + "3,1\n" + "1,8\n" + "1,9\n" + 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/WordCountUnionReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/WordCountUnionReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/WordCountUnionReduceITCase.java
index 2c09636..52c815b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/WordCountUnionReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/WordCountUnionReduceITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.types.StringValue;
  * This test case is an adaption of issue #192 (and #124), which revealed problems with the union readers in Nephele.
  * The problems have been fixed with commit 1228a5e. Without this commit the test will deadlock.
  */
+@SuppressWarnings("deprecation")
 public class WordCountUnionReduceITCase extends RecordAPITestBase {
 
 	private static final int MULTIPLY = 1000;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ComputeEdgeDegrees.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ComputeEdgeDegrees.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ComputeEdgeDegrees.java
index c3c8205..9ec8c73 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ComputeEdgeDegrees.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ComputeEdgeDegrees.java
@@ -38,7 +38,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-
+@SuppressWarnings("deprecation")
 public class ComputeEdgeDegrees implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java
index 27c34c5..cc6fd01 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java
@@ -41,10 +41,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-
-/**
- *
- */
+@SuppressWarnings("deprecation")
 public class ConnectedComponentsWithCoGroup implements Program {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
index 049c9c4..c87dd64 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
@@ -36,7 +36,7 @@ import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageRankStatsAggregat
 import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageWithRankOutFormat;
 import org.apache.flink.types.LongValue;
 
-
+@SuppressWarnings("deprecation")
 public class DanglingPageRank implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DeltaPageRankWithInitialDeltas.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DeltaPageRankWithInitialDeltas.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DeltaPageRankWithInitialDeltas.java
index bb5343e..81adbf5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DeltaPageRankWithInitialDeltas.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DeltaPageRankWithInitialDeltas.java
@@ -41,6 +41,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("deprecation")
 public class DeltaPageRankWithInitialDeltas implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesOnEdgesWithDegrees.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesOnEdgesWithDegrees.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesOnEdgesWithDegrees.java
index 7e8f858..b2328ba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesOnEdgesWithDegrees.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesOnEdgesWithDegrees.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.graph;
 
 import java.io.Serializable;
@@ -39,12 +38,12 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-
 /**
  * An implementation of the triangle enumeration, which expects its input to
  * encode the degrees of the vertices. The algorithm selects the lower-degree vertex for the
  * enumeration of open triads.
  */
+@SuppressWarnings("deprecation")
 public class EnumTrianglesOnEdgesWithDegrees implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesRdfFoaf.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesRdfFoaf.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesRdfFoaf.java
index 5662400..ba8b54a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesRdfFoaf.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesRdfFoaf.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.graph;
 
 import java.io.Serializable;
@@ -52,6 +51,7 @@ import org.apache.flink.util.Collector;
  * The algorithm was published as MapReduce job by J. Cohen in "Graph Twiddling in a MapReduce World".
  * The Pact version was described in "MapReduce and PACT - Comparing Data Parallel Programming Models" (BTW 2011).
  */
+@SuppressWarnings("deprecation")
 public class EnumTrianglesRdfFoaf implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
index 6699030..dc52158 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.graph;
 
 import org.apache.flink.api.common.Plan;
@@ -43,6 +42,7 @@ import org.apache.flink.types.IntValue;
  * to compute the degrees of the vertices and to select the lower-degree vertex for the
  * enumeration of open triads.
  */
+@SuppressWarnings("deprecation")
 public class EnumTrianglesWithDegrees implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
index 0886dbb..55e2f57 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.graph;
 
 import java.io.IOException;
@@ -58,9 +57,8 @@ import org.apache.flink.util.Collector;
  * 2) The programs text-serialization for paths (see @see PathInFormat and @see PathOutFormat). 
  * 
  * The RDF input format is used if the 4th parameter of the getPlan() method is set to "true". If set to "false" the path input format is used. 
- *  
- *
  */
+@SuppressWarnings("deprecation")
 public class PairwiseSP implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
index ccd3213..3abf743 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.graph;
 
 import java.io.Serializable;
@@ -44,7 +43,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-
+@SuppressWarnings("deprecation")
 public class SimplePageRank implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
index 2f6f740..0dbb20a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.graph;
 
 import java.io.Serializable;
@@ -43,9 +42,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-/**
- *
- */
+@SuppressWarnings("deprecation")
 public class WorksetConnectedComponents implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
index a1bcba6..d4f7a5c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Collector;
  * INPUT = (pageId, currentRank, dangling), (pageId, partialRank).
  * OUTPUT = (pageId, newRank, dangling)
  */
+@SuppressWarnings("deprecation")
 @ConstantFieldsFirst(0)
 public class DotProductCoGroup extends CoGroupFunction implements Serializable {
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
index b52d9eb..339cef5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.graph.pageRankUtil;
 
 import java.io.Serializable;
@@ -32,6 +31,7 @@ import org.apache.flink.util.Collector;
  * INPUT = (pageId, rank, dangling), (pageId, neighbors-list).
  * OUTPUT = (targetPageId, partialRank)
  */
+@SuppressWarnings("deprecation")
 public class DotProductMatch extends JoinFunction implements Serializable {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
index d5d71ad..4069f9a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.kmeans;
 
 import java.util.ArrayList;
@@ -38,7 +37,7 @@ import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
 import org.apache.flink.test.recordJobs.kmeans.udfs.RecomputeClusterCenter;
 import org.apache.flink.types.IntValue;
 
-
+@SuppressWarnings("deprecation")
 public class KMeansCross implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
index f937ca8..bdf7466 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.kmeans;
 
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -47,7 +45,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
-
+@SuppressWarnings("deprecation")
 public class KMeansSingleStep implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
index 16267f6..ee33113 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
@@ -30,6 +30,7 @@ import org.apache.flink.types.Record;
  * Cross PACT computes the distance of all data points to all cluster
  * centers.
  */
+@SuppressWarnings("deprecation")
 @ConstantFieldsFirst({0,1})
 public class ComputeDistance extends CrossFunction implements Serializable {
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
index fea1468..78b60ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
@@ -33,6 +33,7 @@ import org.apache.flink.util.Collector;
  * Cross PACT computes the distance of all data points to all cluster
  * centers.
  */
+@SuppressWarnings("deprecation")
 @ConstantFieldsFirst({0,1})
 public class ComputeDistanceParameterized extends MapFunction implements Serializable {
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
index 9e7fcf0..1e893ce 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
@@ -33,6 +33,7 @@ import org.apache.flink.util.Collector;
  * Reduce PACT determines the closes cluster center for a data point. This
  * is a minimum aggregation. Hence, a Combiner can be easily implemented.
  */
+@SuppressWarnings("deprecation")
 @Combinable
 @ConstantFields(1)
 public class FindNearestCenter extends ReduceFunction implements Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
index fd842c3..89e222b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Collector;
  * 0: clusterID
  * 1: clusterVector
  */
+@SuppressWarnings("deprecation")
 @Combinable
 @ConstantFields(0)
 public class RecomputeClusterCenter extends ReduceFunction implements Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
index 8413a47..74b8f4c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational;
 
 import java.util.Iterator;
@@ -37,6 +36,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("deprecation")
 public class MergeOnlyJoin implements Program {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
index 48103d1..a3c4c74 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational;
 
 import org.apache.flink.api.common.Plan;
@@ -32,7 +31,7 @@ import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
 import org.apache.flink.test.recordJobs.util.StringTupleDataOutFormat;
 import org.apache.flink.types.StringValue;
 
-
+@SuppressWarnings("deprecation")
 public class TPCHQuery1 implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
index f4afb9d..7a8ffc6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational;
 
 import java.io.IOException;
@@ -44,7 +43,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "deprecation"})
 public class TPCHQuery10 implements Program, ProgramDescription {
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
index 500e160..cebe6f9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational;
 
 import java.io.Serializable;
@@ -64,6 +63,7 @@ import org.apache.flink.util.Collector;
  *     AND o_orderpriority LIKE "Z%"
  * GROUP BY l_orderkey, o_shippriority;
  */
+@SuppressWarnings("deprecation")
 public class TPCHQuery3 implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
index cc48949..157e3cf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational;
 
 import org.apache.flink.api.common.Plan;
@@ -54,6 +53,7 @@ import org.apache.flink.types.StringValue;
  *     AND o_orderpriority LIKE "Z%"
  * GROUP BY l_orderkey, o_shippriority;
  */
+@SuppressWarnings("deprecation")
 public class TPCHQuery3Unioned implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
index 6e5ee39..2103747 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational;
 
 import java.text.ParseException;
@@ -51,7 +50,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Implementation of the TPC-H Query 4 as a Flink program.
  */
-@SuppressWarnings("serial")
+
+@SuppressWarnings({"serial", "deprecation"})
 public class TPCHQuery4 implements Program, ProgramDescription {
 
 	private static Logger LOG = LoggerFactory.getLogger(TPCHQuery4.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
index 3d87f66..925ed5c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational;
 
 import org.apache.flink.api.common.Plan;
@@ -80,7 +79,7 @@ import org.slf4j.LoggerFactory;
  * <b>Attention:</b> The "order by" part is not implemented!
  * 
  */
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "deprecation"})
 public class TPCHQuery9 implements Program, ProgramDescription {
 	public final String ARGUMENTS = "dop partInputPath partSuppInputPath ordersInputPath lineItemInputPath supplierInputPath nationInputPath outputPath";
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
index 0c5bad7..415fde9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
@@ -57,7 +57,7 @@ import org.apache.flink.util.Collector;
  * GROUP BY c_mktsegment;
  * 
  */
-
+@SuppressWarnings("deprecation")
 public class TPCHQueryAsterix implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/WebLogAnalysis.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/WebLogAnalysis.java
index b02a686..45889ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/WebLogAnalysis.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/WebLogAnalysis.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational;
 
 import java.io.Serializable;
@@ -81,6 +80,7 @@ import org.apache.flink.util.Collector;
  * </pre></code>
  * 
  */
+@SuppressWarnings("deprecation")
 public class WebLogAnalysis implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/GroupByReturnFlag.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/GroupByReturnFlag.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/GroupByReturnFlag.java
index bad7878..d7aab4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/GroupByReturnFlag.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/GroupByReturnFlag.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query1Util;
 
 import java.util.Iterator;
@@ -28,6 +27,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("deprecation")
 public class GroupByReturnFlag extends ReduceFunction {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/LineItemFilter.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/LineItemFilter.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/LineItemFilter.java
index 4d1c830..be54524 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/LineItemFilter.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query1Util/LineItemFilter.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query1Util;
 
 import java.text.DateFormat;
@@ -38,8 +37,8 @@ import org.slf4j.LoggerFactory;
  * TODO: add parametrisation; first version uses a static interval = 90
  * 
  * In prepration of the following reduce step (see {@link GroupByReturnFlag}) the key has to be set to &quot;return flag&quot;
- * 
  */
+@SuppressWarnings("deprecation")
 public class LineItemFilter extends MapFunction {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/AmountAggregate.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/AmountAggregate.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/AmountAggregate.java
index 564675d..638877f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/AmountAggregate.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/AmountAggregate.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
-
 import java.util.Iterator;
 
 import org.apache.flink.api.java.record.functions.ReduceFunction;
@@ -27,7 +25,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "deprecation"})
 public class AmountAggregate extends ReduceFunction {
 	
 	private StringValue value = new StringValue();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/FilteredPartsJoin.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/FilteredPartsJoin.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/FilteredPartsJoin.java
index 28d7257..a391684 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/FilteredPartsJoin.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/FilteredPartsJoin.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
-
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.test.recordJobs.util.Tuple;
 import org.apache.flink.types.IntValue;
@@ -27,7 +25,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "serial", "deprecation" })
 public class FilteredPartsJoin extends JoinFunction {
 	
 	private final IntPair partAndSupplierKey = new IntPair();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/LineItemMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/LineItemMap.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/LineItemMap.java
index f1055a7..6a862b8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/LineItemMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/LineItemMap.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
 import org.apache.flink.api.java.record.functions.MapFunction;
@@ -24,7 +23,7 @@ import org.apache.flink.test.recordJobs.util.Tuple;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class LineItemMap extends MapFunction {
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderMap.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderMap.java
index d3b46e6..ee10333 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderMap.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
 import org.apache.flink.api.java.record.functions.MapFunction;
@@ -25,7 +24,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class OrderMap extends MapFunction {
 	
 	private final Tuple inputTuple = new Tuple();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderedPartsJoin.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderedPartsJoin.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderedPartsJoin.java
index 59a5f3b..2e6453c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderedPartsJoin.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/OrderedPartsJoin.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
 import org.apache.flink.api.java.record.functions.JoinFunction;
@@ -25,7 +24,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class OrderedPartsJoin extends JoinFunction {
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartFilter.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartFilter.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartFilter.java
index b5be55b..1e9a920 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartFilter.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartFilter.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
 import org.apache.flink.api.java.record.functions.MapFunction;
@@ -25,7 +24,7 @@ import org.apache.flink.types.NullValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class PartFilter extends MapFunction {
 
 	private final Tuple inputTuple = new Tuple();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartJoin.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartJoin.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartJoin.java
index f424c83..67f2f6a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartJoin.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartJoin.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
 import org.apache.flink.api.java.record.functions.JoinFunction;
@@ -26,7 +25,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class PartJoin extends JoinFunction {
 	
 	private final Tuple partSuppValue = new Tuple();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartListJoin.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartListJoin.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartListJoin.java
index 65b4595..66860ce 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartListJoin.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartListJoin.java
@@ -16,17 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
-
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class PartListJoin extends JoinFunction {
 
 	private final StringIntPair amountYearPair = new StringIntPair();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartsuppMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartsuppMap.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartsuppMap.java
index 399fec4..32c1d64 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartsuppMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/PartsuppMap.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
 import org.apache.flink.api.java.record.functions.MapFunction;
@@ -24,7 +23,7 @@ import org.apache.flink.test.recordJobs.util.Tuple;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class PartsuppMap extends MapFunction {
 	
 	private Tuple inputTuple = new Tuple();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SupplierMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SupplierMap.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SupplierMap.java
index 1679226..2b43531 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SupplierMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SupplierMap.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
 import org.apache.flink.api.java.record.functions.MapFunction;
@@ -25,7 +24,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class SupplierMap extends MapFunction {
 	
 	private IntValue suppKey = new IntValue();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SuppliersJoin.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SuppliersJoin.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SuppliersJoin.java
index 4103c45..f125237 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SuppliersJoin.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/query9Util/SuppliersJoin.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.relational.query9Util;
 
-
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.test.recordJobs.util.Tuple;
 import org.apache.flink.types.IntValue;
@@ -27,7 +25,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "deprecation", "serial" })
 public class SuppliersJoin extends JoinFunction {
 	
 	private IntValue suppKey = new IntValue();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/ReduceGroupSort.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/ReduceGroupSort.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/ReduceGroupSort.java
index 4065d4a..b4dc1b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/ReduceGroupSort.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/ReduceGroupSort.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.sort;
 
 import java.io.Serializable;
@@ -42,6 +41,7 @@ import org.apache.flink.util.Collector;
  * This job shows how to define ordered input for a Reduce contract.
  * The inputs for CoGroups can be (individually) ordered as well.  
  */
+@SuppressWarnings("deprecation")
 public class ReduceGroupSort implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/TeraSort.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/TeraSort.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/TeraSort.java
index 7207c78..860e6b9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/TeraSort.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/TeraSort.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.sort;
 
 import org.apache.flink.api.common.Plan;
@@ -38,6 +37,7 @@ import org.apache.flink.test.recordJobs.sort.tsUtil.TeraOutputFormat;
  * href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/examples/terasort/TeraGen.html">
  * http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/examples/terasort/TeraGen.html</a>.
  */
+@SuppressWarnings("deprecation")
 public final class TeraSort implements Program, ProgramDescription {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
index 49a73a1..53b2663 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.wordcount;
 
 import java.util.Iterator;
@@ -46,6 +45,7 @@ import org.apache.flink.util.Collector;
  * Implements a word count which takes the input file and counts the number of
  * the occurrences of each word in the file.
  */
+@SuppressWarnings("deprecation")
 public class WordCount implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de7f478f/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java
index cf21d06..9054a87 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobs.wordcount;
 
 import java.io.IOException;
@@ -57,6 +56,7 @@ import org.apache.flink.util.Collector;
  * This is similar to the WordCount example and additionally demonstrates how to
  * use custom accumulators (built-in or custom).
  */
+@SuppressWarnings("deprecation")
 public class WordCountAccumulators implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;


[4/4] incubator-flink git commit: [FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support

Posted by se...@apache.org.
[FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support

This closes #252


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/13968cd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/13968cd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/13968cd4

Branch: refs/heads/master
Commit: 13968cd4de446b4f565a094554380eb8559b6cf9
Parents: de7f478
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Dec 5 19:19:29 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Dec 14 16:38:32 2014 +0100

----------------------------------------------------------------------
 .../mapred/HadoopIOFormatsITCase.java           | 230 +++++++++++++++++++
 flink-java/pom.xml                              |  35 +++
 .../typeutils/runtime/WritableSerializer.java   |   4 +
 .../java/org/apache/hadoop/io/Writable.java     | 105 ---------
 4 files changed, 269 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
new file mode 100644
index 0000000..6ef0f2e
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 2;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String[] resultPath;
+	private String[] expectedResult;
+	private String sequenceFileInPath;
+	private String sequenceFileInPathNull;
+
+	public HadoopIOFormatsITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
+
+		File sequenceFile = createAndRegisterTempFile("seqFile");
+		sequenceFileInPath = sequenceFile.toURI().toString();
+
+		// Create a sequence file
+		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+		FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
+		Path path = new Path(sequenceFile.getAbsolutePath());
+
+		//  ------------------ Long / Text Key Value pair: ------------
+		int kvCount = 4;
+
+		LongWritable key = new LongWritable();
+		Text value = new Text();
+		SequenceFile.Writer writer = null;
+		try {
+			writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
+			for (int i = 0; i < kvCount; i ++) {
+				if(i == 1) {
+					// write key = 0 a bit more often.
+					for(int a = 0;a < 15; a++) {
+						key.set(i);
+						value.set(i+" - somestring");
+						writer.append(key, value);
+					}
+				}
+				key.set(i);
+				value.set(i+" - somestring");
+				writer.append(key, value);
+			}
+		} finally {
+			IOUtils.closeStream(writer);
+		}
+
+
+		//  ------------------ Long / Text Key Value pair: ------------
+
+		File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
+		sequenceFileInPathNull = sequenceFileNull.toURI().toString();
+		path = new Path(sequenceFileInPathNull);
+
+		LongWritable value1 = new LongWritable();
+		SequenceFile.Writer writer1 = null;
+		try {
+			writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
+			for (int i = 0; i < kvCount; i ++) {
+				value1.set(i);
+				writer1.append(NullWritable.get(), value1);
+			}
+		} finally {
+			IOUtils.closeStream(writer1);
+		}
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		for(int i = 0; i < resultPath.length; i++) {
+			compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
+		}
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	public static class HadoopIOFormatPrograms {
+		
+		public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/**
+				 * Test sequence file, including a key access.
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>();
+				JobConf hdconf = new JobConf();
+				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath));
+				HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf);
+				DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif);
+				DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
+					@Override
+					public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
+						return new Tuple2<Long, Text>(value.f0.get(), value.f1);
+					}
+				}).sum(0);
+				sumed.writeAsText(resultPath[0]);
+				DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
+					@Override
+					public String map(Tuple2<LongWritable, Text> value) throws Exception {
+						return value.f1 + " - " + value.f0.get();
+					}
+				});
+				res.writeAsText(resultPath[1]);
+				env.execute();
+				
+				// return expected result
+				return 	new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
+						"1 - somestring - 1\n" +
+						"2 - somestring - 2\n" +
+						"3 - somestring - 3\n"};
+
+			}
+			case 2: {
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
+				JobConf hdconf = new JobConf();
+				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull));
+				HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf);
+				DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif);
+				DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
+					@Override
+					public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
+						return new Tuple2<Void, Long>(null, value.f1.get());
+					}
+				});
+				DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
+				res1.writeAsText(resultPath[1]);
+				res.writeAsText(resultPath[0]);
+				env.execute();
+
+				// return expected result
+				return 	new String [] {"(null,2)\n" +
+						"(null,0)\n" +
+						"(null,1)\n" +
+						"(null,3)",
+						"(null,0)\n" +
+						"(null,1)\n" +
+						"(null,2)\n" +
+						"(null,3)"};
+			}
+			default:
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 4ae2cc3..21a0b68 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -86,4 +86,39 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
+
+	<!-- See main pom.xml for explanation of profiles -->
+	<profiles>
+		<profile>
+			<id>hadoop-1</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop1--><name>hadoop.profile</name><value>1</value>
+				</property>
+			</activation>
+			<dependencies>
+				<!-- "Old" Hadoop = MapReduce v1 -->
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 0fe8fdf..e838d27 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -44,6 +45,9 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	
 	@Override
 	public T createInstance() {
+		if(typeClass == NullWritable.class) {
+			return (T) NullWritable.get();
+		}
 		return InstantiationUtil.instantiate(typeClass);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/hadoop/io/Writable.java b/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
deleted file mode 100644
index 16efe7f..0000000
--- a/flink-java/src/main/java/org/apache/hadoop/io/Writable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// ================================================================================================
-//                                 !!! NOTICE !!!
-// 
-//  This interface has been directly copied from the Apache Hadoop project.
-//  It has been added to this project to allow compiling against the type "Writable"
-//  without adding the heavyweight Hadoop dependency. This keeps the project dependencies 
-//  lightweight.
-//
-//  At runtime, the JVM will load either this interface, or the interface from a Hadoop jar,
-//  if present. In both cases, the dynamic class loading, linking, and method lookup will
-//  allow the types to interoperate as long as package name, class name, and method signature
-//  of this interface are kept strictly in sync with the version packaged with Hadoop.
-//
-//  This is a core interface of the Hadoop project and has been stable across all releases.
-//
-// ================================================================================================
-
-package org.apache.hadoop.io;
-
-import java.io.DataOutput;
-import java.io.DataInput;
-import java.io.IOException;
-
-
-/**
- * A serializable object which implements a simple, efficient, serialization 
- * protocol, based on {@link DataInput} and {@link DataOutput}.
- *
- * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
- * framework implements this interface.</p>
- * 
- * <p>Implementations typically implement a static <code>read(DataInput)</code>
- * method which constructs a new instance, calls {@link #readFields(DataInput)} 
- * and returns the instance.</p>
- * 
- * <p>Example:</p>
- * <p><blockquote><pre>
- *     public class MyWritable implements Writable {
- *       // Some data
- *       private int counter;
- *       private long timestamp;
- *
- *       // Default constructor to allow (de)serialization
- *       MyWritable() { }
- *
- *       public void write(DataOutput out) throws IOException {
- *         out.writeInt(counter);
- *         out.writeLong(timestamp);
- *       }
- *
- *       public void readFields(DataInput in) throws IOException {
- *         counter = in.readInt();
- *         timestamp = in.readLong();
- *       }
- *
- *       public static MyWritable read(DataInput in) throws IOException {
- *         MyWritable w = new MyWritable();
- *         w.readFields(in);
- *         return w;
- *       }
- *     }
- * </pre></blockquote></p>
- */
-public interface Writable {
-	/**
-	 * Serialize the fields of this object to <code>out</code>.
-	 * 
-	 * @param out
-	 *            <code>DataOuput</code> to serialize this object into.
-	 * @throws IOException
-	 */
-	void write(DataOutput out) throws IOException;
-
-	/**
-	 * Deserialize the fields of this object from <code>in</code>.
-	 * 
-	 * <p>
-	 * For efficiency, implementations should attempt to re-use storage in the
-	 * existing object where possible.
-	 * </p>
-	 * 
-	 * @param in
-	 *            <code>DataInput</code> to deseriablize this object from.
-	 * @throws IOException
-	 */
-	void readFields(DataInput in) throws IOException;
-}


[2/4] incubator-flink git commit: [scala] Change ScalaAggregateOperator to use TypeSerializer

Posted by se...@apache.org.
[scala] Change ScalaAggregateOperator to use TypeSerializer

This closes #263


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9e403667
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9e403667
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9e403667

Branch: refs/heads/master
Commit: 9e40366752ba958c562fca69f2afdf6a8ca54b2e
Parents: 0028238
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Dec 11 13:07:06 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Dec 14 16:09:55 2014 +0100

----------------------------------------------------------------------
 .../scala/operators/ScalaAggregateOperator.java | 29 ++++----------------
 1 file changed, 6 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e403667/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index 293b380..d352817 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.aggregation.AggregationFunction;
 import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -39,8 +38,6 @@ import org.apache.flink.api.java.operators.Grouping;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.SingleInputOperator;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
@@ -164,18 +161,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 		}
 		genName.setLength(genName.length()-1);
 
-		TypeSerializer<IN> serializer = getInputType().createSerializer();
-		TypeSerializerFactory<IN> serializerFactory;
-		if (serializer.isStateful()) {
-			serializerFactory = new RuntimeStatefulSerializerFactory<IN>(
-					serializer, getInputType().getTypeClass());
-		} else {
-			serializerFactory = new RuntimeStatelessSerializerFactory<IN>(
-					serializer, getInputType().getTypeClass());
-		}
-
 		@SuppressWarnings("rawtypes")
-		RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(serializerFactory, aggFunctions, fields);
+		RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(getInputType().createSerializer(), aggFunctions, fields);
 
 
 		String name = getName() != null ? getName() : genName.toString();
@@ -251,17 +238,14 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 
 		private final AggregationFunction<Object>[] aggFunctions;
 
-		private final TypeSerializerFactory<T> serializerFactory;
+		private TupleSerializerBase<T> serializer;
 
-		private transient TupleSerializerBase<T> serializer;
-
-		public AggregatingUdf(TypeSerializerFactory<T> serializerFactory, AggregationFunction<Object>[] aggFunctions, int[] fieldPositions) {
-			Validate.notNull(serializerFactory);
+		public AggregatingUdf(TypeSerializer<T> serializer, AggregationFunction<Object>[] aggFunctions, int[] fieldPositions) {
+			Validate.notNull(serializer);
 			Validate.notNull(aggFunctions);
 			Validate.isTrue(aggFunctions.length == fieldPositions.length);
-			Validate.isTrue(serializerFactory.getSerializer() instanceof TupleSerializerBase);
-
-			this.serializerFactory = serializerFactory;
+			Validate.isInstanceOf(TupleSerializerBase.class, serializer, "Serializer for Scala Aggregate Operator must be a tuple serializer.");
+			this.serializer = (TupleSerializerBase<T>) serializer;
 			this.aggFunctions = aggFunctions;
 			this.fieldPositions = fieldPositions;
 		}
@@ -272,7 +256,6 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 			for (AggregationFunction<Object> aggFunction : aggFunctions) {
 				aggFunction.initializeAggregate();
 			}
-			serializer = (TupleSerializerBase<T>)serializerFactory.getSerializer();
 		}
 
 		@Override