You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/24 18:17:55 UTC

[12/16] flink git commit: [FLINK-2901] Remove Record API dependencies from flink-tests #2

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
deleted file mode 100644
index dc52158..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.graph.ComputeEdgeDegrees.CountEdges;
-import org.apache.flink.test.recordJobs.graph.ComputeEdgeDegrees.JoinCountsAndUniquify;
-import org.apache.flink.test.recordJobs.graph.ComputeEdgeDegrees.ProjectEdge;
-import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees.BuildTriads;
-import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees.CloseTriads;
-import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees.ProjectOutCounts;
-import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees.ProjectToLowerDegreeVertex;
-import org.apache.flink.test.recordJobs.graph.triangleEnumUtil.EdgeInputFormat;
-import org.apache.flink.test.recordJobs.graph.triangleEnumUtil.TriangleOutputFormat;
-import org.apache.flink.types.IntValue;
-
-/**
- * An implementation of the triangle enumeration, which includes the pre-processing step
- * to compute the degrees of the vertices and to select the lower-degree vertex for the
- * enumeration of open triads.
- */
-@SuppressWarnings("deprecation")
-public class EnumTrianglesWithDegrees implements Program, ProgramDescription {
-	
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		final int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		final String edgeInput = args.length > 1 ? args[1] : "";
-		final String output    = args.length > 2 ? args[2] : "";
-		final char delimiter   = args.length > 3 ? (char) Integer.parseInt(args[3]) : ',';
-		
-
-		FileDataSource edges = new FileDataSource(new EdgeInputFormat(), edgeInput, "Input Edges");
-		edges.setParameter(EdgeInputFormat.ID_DELIMITER_CHAR, delimiter);
-
-		// =========================== Vertex Degree ============================
-		
-		MapOperator projectEdge = MapOperator.builder(new ProjectEdge())
-				.input(edges).name("Project Edge").build();
-		
-		ReduceOperator edgeCounter = ReduceOperator.builder(new CountEdges(), IntValue.class, 0)
-				.input(projectEdge).name("Count Edges for Vertex").build();
-		
-		ReduceOperator countJoiner = ReduceOperator.builder(new JoinCountsAndUniquify(), IntValue.class, 0)
-				.keyField(IntValue.class, 1)
-				.input(edgeCounter).name("Join Counts").build();
-		
-		
-		// =========================== Triangle Enumeration ============================
-		
-		MapOperator toLowerDegreeEdge = MapOperator.builder(new ProjectToLowerDegreeVertex())
-				.input(countJoiner).name("Select lower-degree Edge").build();
-		
-		MapOperator projectOutCounts = MapOperator.builder(new ProjectOutCounts())
-				.input(countJoiner).name("Project out Counts").build();
-
-		ReduceOperator buildTriads = ReduceOperator.builder(new BuildTriads(), IntValue.class, 0)
-				.input(toLowerDegreeEdge).name("Build Triads").build();
-
-		JoinOperator closeTriads = JoinOperator.builder(new CloseTriads(), IntValue.class, 1, 0)
-				.keyField(IntValue.class, 2, 1)
-				.input1(buildTriads).input2(projectOutCounts)
-				.name("Close Triads").build();
-		closeTriads.setParameter("INPUT_SHIP_STRATEGY", "SHIP_REPARTITION_HASH");
-		closeTriads.setParameter("LOCAL_STRATEGY", "LOCAL_STRATEGY_HASH_BUILD_SECOND");
-
-		FileDataSink triangles = new FileDataSink(new TriangleOutputFormat(), output, closeTriads, "Triangles");
-
-		Plan p = new Plan(triangles, "Enumerate Triangles");
-		p.setDefaultParallelism(numSubTasks);
-		return p;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [noSubStasks] [input file] [output file]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
deleted file mode 100644
index 34d4b60..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-/**
- * Implementation of the Pairwise Shortest Path example PACT program.
- * The program implements one iteration of the algorithm and must be run multiple times until no changes are computed.
- * 
- * The pairwise shortest path algorithm comes from the domain graph problems. The goal is to find all shortest paths
- * between any two transitively connected nodes in a graph. In this implementation edges are interpreted as directed and weighted.
- * 
- * For the first iteration, the program allows two input formats:
- * 1) RDF triples with foaf:knows predicates. A triple is interpreted as an edge from the RDF subject to the RDF object with weight 1.
- * 2) The programs text-serialization for paths (see @see PathInFormat and @see PathOutFormat). 
- * 
- * The RDF input format is used if the 4th parameter of the getPlan() method is set to "true". If set to "false" the path input format is used. 
- */
-@SuppressWarnings("deprecation")
-public class PairwiseSP implements Program, ProgramDescription {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Reads RDF triples and filters on the foaf:knows RDF predicate. The triples elements must be separated by whitespaces.
-	 * The foaf:knows RDF predicate indicates that the RDF subject knows the object (typically of type foaf:person).
-	 * The connections between people are extracted and handles as graph edges. For the Pairwise Shortest Path algorithm the 
-	 * connection is interpreted as a directed edge, i.e. subject knows object, but the object does not necessarily know the subject.
-	 * 
-	 * The RDFTripleInFormat filters all RDF triples with foaf:knows predicates. 
-	 * For each triple with foaf:knows predicate, a record is emitted with 
-	 * - from-node being the RDF subject at field position 0,
-	 * - to-node being the RDF object at field position 1,
-	 * - length being 1 at field position 2, and 
-	 * - hopList being an empty string at field position 3. 
-	 *
-	 */
-	public static class RDFTripleInFormat extends DelimitedInputFormat {
-		private static final long serialVersionUID = 1L;
-
-		private final StringValue fromNode = new StringValue();
-		private final StringValue toNode = new StringValue();
-		private final IntValue pathLength = new IntValue(1);
-		private final IntValue hopCnt = new IntValue(0);
-		private final StringValue hopList = new StringValue(" ");
-		
-		@Override
-		public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-			String lineStr = new String(bytes, offset, numBytes);
-			// replace reduce whitespaces and trim
-			lineStr = lineStr.replaceAll("\\s+", " ").trim();
-			// build whitespace tokenizer
-			StringTokenizer st = new StringTokenizer(lineStr, " ");
-
-			// line must have at least three elements
-			if (st.countTokens() < 3) {
-				return null;
-			}
-
-			String rdfSubj = st.nextToken();
-			String rdfPred = st.nextToken();
-			String rdfObj = st.nextToken();
-
-			// we only want foaf:knows predicates
-			if (!rdfPred.equals("<http://xmlns.com/foaf/0.1/knows>")) {
-				return null;
-			}
-
-			// build node pair from subject and object
-			fromNode.setValue(rdfSubj);
-			toNode.setValue(rdfObj);
-						
-			target.setField(0, fromNode);
-			target.setField(1, toNode);
-			target.setField(2, pathLength);
-			target.setField(3, hopCnt);
-			target.setField(4, hopList);
-
-			return target;
-		}
-	}
-	
-	/**
-	 * The PathInFormat reads paths consisting of a from-node a to-node, a length, and hop node list serialized as a string.
-	 * All four elements of the path must be separated by the pipe character ('|') and may not contain any pipe characters itself.
-	 * 
-	 * PathInFormat returns records with:
-	 * - from-node at field position 0,
-	 * - to-node at field position 1,
-	 * - length at field position 2,
-	 * - hop list at field position 3. 
-	 */
-	public static class PathInFormat extends DelimitedInputFormat {
-		private static final long serialVersionUID = 1L;
-		
-		private final StringValue fromNode = new StringValue();
-		private final StringValue toNode = new StringValue();
-		private final IntValue length = new IntValue();
-		private final IntValue hopCnt = new IntValue();
-		private final StringValue hopList = new StringValue();
-
-		@Override
-		public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-			String lineStr = new String(bytes, offset, numBytes);
-			StringTokenizer st = new StringTokenizer(lineStr, "|");
-			
-			// path must have exactly 5 tokens (fromNode, toNode, length, hopCnt, hopList)
-			if (st.countTokens() != 5) {
-				return null;
-			}
-			
-			this.fromNode.setValue(st.nextToken());
-			this.toNode.setValue(st.nextToken());
-			this.length.setValue(Integer.parseInt(st.nextToken()));
-			this.hopCnt.setValue(Integer.parseInt(st.nextToken()));
-			this.hopList.setValue(st.nextToken());
-
-			target.setField(0, fromNode);
-			target.setField(1, toNode);
-			target.setField(2, length);
-			target.setField(3, hopCnt);
-			target.setField(4, hopList);
-			
-			return target;
-		}
-	}
-
-	/**
-	 * The PathOutFormat serializes paths to text. 
-	 * In order, the from-node, the to-node, the length, the hop list are written out.
-	 * Elements are separated by the pipe character ('|').  
-	 * 
-	 *
-	 */
-	public static class PathOutFormat extends FileOutputFormat {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			StringBuilder line = new StringBuilder();
-
-			// append from-node
-			line.append(record.getField(0, StringValue.class).toString());
-			line.append("|");
-			// append to-node
-			line.append(record.getField(1, StringValue.class).toString());
-			line.append("|");
-			// append length
-			line.append(record.getField(2, IntValue.class).toString());
-			line.append("|");
-			// append hopCnt
-			line.append(record.getField(3, IntValue.class).toString());
-			line.append("|");
-			// append hopList
-			line.append(record.getField(4, StringValue.class).toString());
-			line.append("|");
-			line.append("\n");
-
-			stream.write(line.toString().getBytes());
-		}
-	}
-
-	/**
-	 * Concatenates two paths where the from-node of the first path and the to-node of the second path are the same.
-	 * The second input path becomes the first part and the first input path the second part of the output path.
-	 * The length of the output path is the sum of both input paths. 
-	 * The output path's hops list is built from both path's hops lists and the common node.  
-	 */
-	@ConstantFieldsFirst(1)
-	@ConstantFieldsSecond(0)
-	public static class ConcatPaths extends JoinFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private final Record outputRecord = new Record();
-		
-		private final IntValue length = new IntValue();
-		private final IntValue hopCnt = new IntValue();
-		private final StringValue hopList = new StringValue();
-		
-		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
-
-			// rec1 has matching start, rec2 matching end
-			// Therefore, rec2's end node and rec1's start node are identical
-			// First half of new path will be rec2, second half will be rec1
-			
-			// Get from-node and to-node of new path  
-			final StringValue fromNode = rec2.getField(0, StringValue.class);
-			final StringValue toNode = rec1.getField(1, StringValue.class);
-			
-			// Check whether from-node = to-node to prevent circles!
-			if (fromNode.equals(toNode)) {
-				return;
-			}
-
-			// Create new path
-			outputRecord.setField(0, fromNode);
-			outputRecord.setField(1, toNode);
-			
-			// Compute length of new path
-			length.setValue(rec1.getField(2, IntValue.class).getValue() + rec2.getField(2, IntValue.class).getValue());
-			outputRecord.setField(2, length);
-			
-			// compute hop count
-			int hops = rec1.getField(3, IntValue.class).getValue() + 1 + rec2.getField(3, IntValue.class).getValue();
-			hopCnt.setValue(hops);
-			outputRecord.setField(3, hopCnt);
-			
-			// Concatenate hops lists and insert matching node
-			StringBuilder sb = new StringBuilder();
-			// first path
-			sb.append(rec2.getField(4, StringValue.class).getValue());
-			sb.append(" ");
-			// common node
-			sb.append(rec1.getField(0, StringValue.class).getValue());
-			// second path
-			sb.append(" ");
-			sb.append(rec1.getField(4, StringValue.class).getValue());
-						
-			hopList.setValue(sb.toString().trim());
-			outputRecord.setField(4, hopList);
-						
-			out.collect(outputRecord);
-		}
-	}
-
-	/**
-	 * Gets two lists of paths as input and emits for each included from-node/to-node combination the shortest path(s).
-	 * If for a combination more than one shortest path exists, all shortest paths are emitted. 
-	 * 
-	 *
-	 */
-	@ConstantFieldsFirst({0,1})
-	@ConstantFieldsSecond({0,1})
-	public static class FindShortestPath extends CoGroupFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private final Record outputRecord = new Record();
-		
-		private final Set<StringValue> shortestPaths = new HashSet<StringValue>();
-		private final Map<StringValue,IntValue> hopCnts = new HashMap<StringValue,IntValue>();
-		private final IntValue minLength = new IntValue();
-		
-		@Override
-		public void coGroup(Iterator<Record> inputRecords, Iterator<Record> concatRecords, Collector<Record> out) {
-
-			// init minimum length and minimum path
-			Record pathRec = null;
-			StringValue path = null;
-			if(inputRecords.hasNext()) {
-				// path is in input paths
-				pathRec = inputRecords.next();
-			} else {
-				// path must be in concat paths
-				pathRec = concatRecords.next();
-			}
-			// get from node (common for all paths)
-			StringValue fromNode = pathRec.getField(0, StringValue.class);
-			// get to node (common for all paths)
-			StringValue toNode = pathRec.getField(1, StringValue.class);
-			// get length of path
-			minLength.setValue(pathRec.getField(2, IntValue.class).getValue());
-			// store path and hop count
-			path = new StringValue(pathRec.getField(4, StringValue.class));
-			shortestPaths.add(path);
-			hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
-						
-			// find shortest path of all input paths
-			while (inputRecords.hasNext()) {
-				pathRec = inputRecords.next();
-				IntValue length = pathRec.getField(2, IntValue.class);
-				
-				if (length.getValue() == minLength.getValue()) {
-					// path has also minimum length add to list
-					path = new StringValue(pathRec.getField(4, StringValue.class));
-					if(shortestPaths.add(path)) {
-						hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
-					}
-				} else if (length.getValue() < minLength.getValue()) {
-					// path has minimum length
-					minLength.setValue(length.getValue());
-					// clear lists
-					hopCnts.clear();
-					shortestPaths.clear();
-					// get path and add path and hop count
-					path = new StringValue(pathRec.getField(4, StringValue.class));
-					shortestPaths.add(path);
-					hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
-				}
-			}
-
-			// find shortest path of all input and concatenated paths
-			while (concatRecords.hasNext()) {
-				pathRec = concatRecords.next();
-				IntValue length = pathRec.getField(2, IntValue.class);
-				
-				if (length.getValue() == minLength.getValue()) {
-					// path has also minimum length add to list
-					path = new StringValue(pathRec.getField(4, StringValue.class));
-					if(shortestPaths.add(path)) {
-						hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
-					}
-				} else if (length.getValue() < minLength.getValue()) {
-					// path has minimum length
-					minLength.setValue(length.getValue());
-					// clear lists
-					hopCnts.clear();
-					shortestPaths.clear();
-					// get path and add path and hop count
-					path = new StringValue(pathRec.getField(4, StringValue.class));
-					shortestPaths.add(path);
-					hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
-				}
-			}
-			
-			outputRecord.setField(0, fromNode);
-			outputRecord.setField(1, toNode);
-			outputRecord.setField(2, minLength);
-			
-			// emit all shortest paths
-			for(StringValue shortestPath : shortestPaths) {
-				outputRecord.setField(3, hopCnts.get(shortestPath));
-				outputRecord.setField(4, shortestPath);
-				out.collect(outputRecord);
-			}
-									
-			hopCnts.clear();
-			shortestPaths.clear();
-			
-		}
-	}
-
-	/**
-	 * Assembles the Plan of the Pairwise Shortest Paths example Pact program.
-	 * The program computes one iteration of the Pairwise Shortest Paths algorithm.
-	 * 
-	 * For the first iteration, two input formats can be chosen:
-	 * 1) RDF triples with foaf:knows predicates
-	 * 2) Text-serialized paths (see PathInFormat and PathOutFormat)
-	 * 
-	 * To choose 1) set the forth parameter to "true". If set to "false" 2) will be used.
-	 *
-	 */
-	@Override
-	public Plan getPlan(String... args) {
-
-		// parse job parameters
-		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String paths     = (args.length > 1 ? args[1] : "");
-		String output    = (args.length > 2 ? args[2] : "");
-		boolean rdfInput = (args.length > 3 && Boolean.parseBoolean(args[3]));
-
-		FileDataSource pathsInput;
-		
-		if(rdfInput) {
-			pathsInput = new FileDataSource(new RDFTripleInFormat(), paths, "RDF Triples");
-		} else {
-			pathsInput = new FileDataSource(new PathInFormat(), paths, "Paths");
-		}
-		pathsInput.setParallelism(numSubTasks);
-
-		JoinOperator concatPaths = 
-				JoinOperator.builder(new ConcatPaths(), StringValue.class, 0, 1)
-			.name("Concat Paths")
-			.build();
-
-		concatPaths.setParallelism(numSubTasks);
-
-		CoGroupOperator findShortestPaths = 
-				CoGroupOperator.builder(new FindShortestPath(), StringValue.class, 0, 0)
-			.keyField(StringValue.class, 1, 1)
-			.name("Find Shortest Paths")
-			.build();
-		findShortestPaths.setParallelism(numSubTasks);
-
-		FileDataSink result = new FileDataSink(new PathOutFormat(),output, "New Paths");
-		result.setParallelism(numSubTasks);
-
-		result.setInput(findShortestPaths);
-		findShortestPaths.setFirstInput(pathsInput);
-		findShortestPaths.setSecondInput(concatPaths);
-		concatPaths.setFirstInput(pathsInput);
-		concatPaths.setSecondInput(pathsInput);
-
-		return new Plan(result, "Pairwise Shortest Paths");
-
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks], [inputPaths], [outputPaths], [RDFInputFlag]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
deleted file mode 100644
index 0dbb20a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class WorksetConnectedComponents implements Program, ProgramDescription {
-	
-	private static final long serialVersionUID = 1L;
-
-	public static final class DuplicateLongMap extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			record.setField(1, record.getField(0, LongValue.class));
-			out.collect(record);
-		}
-	}
-	
-	/**
-	 * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
-	 * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
-	 * produces a (Target-vertex-ID, Component-ID) pair.
-	 */
-	public static final class NeighborWithComponentIDJoin extends JoinFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private final Record result = new Record();
-
-		@Override
-		public void join(Record vertexWithComponent, Record edge, Collector<Record> out) {
-			this.result.setField(0, edge.getField(1, LongValue.class));
-			this.result.setField(1, vertexWithComponent.getField(1, LongValue.class));
-			out.collect(this.result);
-		}
-	}
-	
-	/**
-	 * Minimum aggregation over (Vertex-ID, Component-ID) pairs, selecting the pair with the smallest Component-ID.
-	 */
-	@Combinable
-	@ConstantFields(0)
-	public static final class MinimumComponentIDReduce extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private final Record result = new Record();
-		private final LongValue vertexId = new LongValue();
-		private final LongValue minComponentId = new LongValue();
-		
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) {
-
-			final Record first = records.next();
-			final long vertexID = first.getField(0, LongValue.class).getValue();
-			
-			long minimumComponentID = first.getField(1, LongValue.class).getValue();
-
-			while (records.hasNext()) {
-				long candidateComponentID = records.next().getField(1, LongValue.class).getValue();
-				if (candidateComponentID < minimumComponentID) {
-					minimumComponentID = candidateComponentID;
-				}
-			}
-			
-			this.vertexId.setValue(vertexID);
-			this.minComponentId.setValue(minimumComponentID);
-			this.result.setField(0, this.vertexId);
-			this.result.setField(1, this.minComponentId);
-			out.collect(this.result);
-		}
-	}
-	
-	/**
-	 * UDF that joins a candidate (Vertex-ID, Component-ID) pair with another (Vertex-ID, Component-ID) pair.
-	 * Returns the candidate pair, if the candidate's Component-ID is smaller.
-	 */
-	@ConstantFieldsFirst(0)
-	public static final class UpdateComponentIdMatch extends JoinFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void join(Record newVertexWithComponent, Record currentVertexWithComponent, Collector<Record> out){
-	
-			long candidateComponentID = newVertexWithComponent.getField(1, LongValue.class).getValue();
-			long currentComponentID = currentVertexWithComponent.getField(1, LongValue.class).getValue();
-	
-			if (candidateComponentID < currentComponentID) {
-				out.collect(newVertexWithComponent);
-			}
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		final int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		final String verticesInput = (args.length > 1 ? args[1] : "");
-		final String edgeInput = (args.length > 2 ? args[2] : "");
-		final String output = (args.length > 3 ? args[3] : "");
-		final int maxIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 1);
-
-		// data source for initial vertices
-		FileDataSource initialVertices = new FileDataSource(new CsvInputFormat(' ', LongValue.class), verticesInput, "Vertices");
-		
-		MapOperator verticesWithId = MapOperator.builder(DuplicateLongMap.class).input(initialVertices).name("Assign Vertex Ids").build();
-		
-		// the loop takes the vertices as the solution set and changed vertices as the workset
-		// initially, all vertices are changed
-		DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
-		iteration.setInitialSolutionSet(verticesWithId);
-		iteration.setInitialWorkset(verticesWithId);
-		iteration.setMaximumNumberOfIterations(maxIterations);
-		
-		// data source for the edges
-		FileDataSource edges = new FileDataSource(new CsvInputFormat(' ', LongValue.class, LongValue.class), edgeInput, "Edges");
-
-		// join workset (changed vertices) with the edges to propagate changes to neighbors
-		JoinOperator joinWithNeighbors = JoinOperator.builder(new NeighborWithComponentIDJoin(), LongValue.class, 0, 0)
-				.input1(iteration.getWorkset())
-				.input2(edges)
-				.name("Join Candidate Id With Neighbor")
-				.build();
-
-		// find for each neighbor the smallest of all candidates
-		ReduceOperator minCandidateId = ReduceOperator.builder(new MinimumComponentIDReduce(), LongValue.class, 0)
-				.input(joinWithNeighbors)
-				.name("Find Minimum Candidate Id")
-				.build();
-		
-		// join candidates with the solution set and update if the candidate component-id is smaller
-		JoinOperator updateComponentId = JoinOperator.builder(new UpdateComponentIdMatch(), LongValue.class, 0, 0)
-				.input1(minCandidateId)
-				.input2(iteration.getSolutionSet())
-				.name("Update Component Id")
-				.build();
-		
-		iteration.setNextWorkset(updateComponentId);
-		iteration.setSolutionSetDelta(updateComponentId);
-
-		// sink is the iteration result
-		FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, iteration, "Result");
-		CsvOutputFormat.configureRecordFormat(result)
-			.recordDelimiter('\n')
-			.fieldDelimiter(' ')
-			.field(LongValue.class, 0)
-			.field(LongValue.class, 1);
-
-		Plan plan = new Plan(result, "Workset Connected Components");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <numberOfSubTasks> <vertices> <edges> <out> <maxIterations>";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeInputFormat.java
deleted file mode 100644
index 81f4d00..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeInputFormat.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.triangleEnumUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-
-/**
- * 
- */
-public final class EdgeInputFormat extends DelimitedInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	public static final String ID_DELIMITER_CHAR = "edgeinput.delimiter";
-	
-	private final IntValue i1 = new IntValue();
-	private final IntValue i2 = new IntValue();
-	
-	private char delimiter;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-		final int limit = offset + numBytes;
-		int first = 0, second = 0;
-		final char delimiter = this.delimiter;
-		
-		int pos = offset;
-		while (pos < limit && bytes[pos] != delimiter) {
-			first = first * 10 + (bytes[pos++] - '0');
-		}
-		pos += 1;// skip the delimiter
-		while (pos < limit) {
-			second = second * 10 + (bytes[pos++] - '0');
-		}
-		
-		if (first <= 0 || second <= 0 || first == second) {
-			return null;
-		}
-		
-		this.i1.setValue(first);
-		this.i2.setValue(second);
-		target.setField(0, this.i1);
-		target.setField(1, this.i2);
-		return target;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void configure(Configuration parameters) {
-		super.configure(parameters);
-		this.delimiter = (char) parameters.getInteger(ID_DELIMITER_CHAR, ',');
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesInputFormat.java
deleted file mode 100644
index 8441602..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesInputFormat.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.triangleEnumUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-
-/**
- * Input format that reads edges augmented with vertex degrees. The data to be read is assumed to be in
- * the format <code>v1,d1|v2,d2\n</code>, where <code>v1</code> and <code>v2</code> are the IDs of the first and
- * second vertex, while <code>d1</code> and <code>d2</code> are the vertex degrees.
- * <p>
- * The result record holds the fields in the sequence <code>(v1, v2, d1, d2)</code>.
- * <p>
- * The delimiters are configurable. The default delimiter between vertex ID and
- * vertex degree is the comma (<code>,</code>). The default delimiter between the two vertices is
- * the vertical bar (<code>|</code>).
- */
-public final class EdgeWithDegreesInputFormat extends DelimitedInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	public static final String VERTEX_DELIMITER_CHAR = "edgeinput.vertexdelimiter";
-	public static final String DEGREE_DELIMITER_CHAR = "edgeinput.degreedelimiter";
-	
-	private final IntValue v1 = new IntValue();
-	private final IntValue v2 = new IntValue();
-	private final IntValue d1 = new IntValue();
-	private final IntValue d2 = new IntValue();
-	
-	private char vertexDelimiter;
-	private char degreeDelimiter;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-		final int limit = offset + numBytes;
-		int firstV = 0, secondV = 0;
-		int firstD = 0, secondD = 0;
-		
-		final char vertexDelimiter = this.vertexDelimiter;
-		final char degreeDelimiter = this.degreeDelimiter;
-		
-		int pos = offset;
-		
-		// read the first vertex ID
-		while (pos < limit && bytes[pos] != degreeDelimiter) {
-			firstV = firstV * 10 + (bytes[pos++] - '0');
-		}
-		
-		pos += 1;// skip the delimiter
-		
-		// read the first vertex degree
-		while (pos < limit && bytes[pos] != vertexDelimiter) {
-			firstD = firstD * 10 + (bytes[pos++] - '0');
-		}
-		
-		pos += 1;// skip the delimiter
-		
-		// read the second vertex ID
-		while (pos < limit && bytes[pos] != degreeDelimiter) {
-			secondV = secondV * 10 + (bytes[pos++] - '0');
-		}
-		
-		pos += 1;// skip the delimiter
-		
-		// read the second vertex degree
-		while (pos < limit) {
-			secondD = secondD * 10 + (bytes[pos++] - '0');
-		}
-		
-		if (firstV <= 0 || secondV <= 0 || firstV == secondV) {
-			return null;
-		}
-		
-		v1.setValue(firstV);
-		v2.setValue(secondV);
-		d1.setValue(firstD);
-		d2.setValue(secondD);
-		
-		target.setField(0, v1);
-		target.setField(1, v2);
-		target.setField(2, d1);
-		target.setField(3, d2);
-		
-		return target;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void configure(Configuration parameters) {
-		super.configure(parameters);
-		this.vertexDelimiter = (char) parameters.getInteger(VERTEX_DELIMITER_CHAR, '|');
-		this.degreeDelimiter = (char) parameters.getInteger(DEGREE_DELIMITER_CHAR, ',');
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesOutputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesOutputFormat.java
deleted file mode 100644
index a72a355..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesOutputFormat.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.triangleEnumUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-
-/**
- * 
- */
-public final class EdgeWithDegreesOutputFormat extends DelimitedOutputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	private final StringBuilder line = new StringBuilder();
-
-	@Override
-	public int serializeRecord(Record rec, byte[] target) throws Exception {
-		final int e1 = rec.getField(0, IntValue.class).getValue();
-		final int e2 = rec.getField(1, IntValue.class).getValue();
-		final int e3 = rec.getField(2, IntValue.class).getValue();
-		final int e4 = rec.getField(3, IntValue.class).getValue();
-		
-		this.line.setLength(0);
-		this.line.append(e1);
-		this.line.append(',');
-		this.line.append(e3);
-		this.line.append('|');
-		this.line.append(e2);
-		this.line.append(',');
-		this.line.append(e4);
-		
-		if (target.length >= line.length()) {
-			for (int i = 0; i < line.length(); i++) {
-				target[i] = (byte) line.charAt(i);
-			}
-			return line.length();
-		}
-		else {
-			return -line.length();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/TriangleOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/TriangleOutputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/TriangleOutputFormat.java
deleted file mode 100644
index f6c27b0..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/TriangleOutputFormat.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.triangleEnumUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-
-/**
- * 
- */
-public final class TriangleOutputFormat extends DelimitedOutputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	private final StringBuilder line = new StringBuilder();
-
-	@Override
-	public int serializeRecord(Record rec, byte[] target) throws Exception {
-		final int e1 = rec.getField(0, IntValue.class).getValue();
-		final int e2 = rec.getField(1, IntValue.class).getValue();
-		final int e3 = rec.getField(2, IntValue.class).getValue();
-		
-		this.line.setLength(0);
-		this.line.append(e1);
-		this.line.append(',');
-		this.line.append(e2);
-		this.line.append(',');
-		this.line.append(e3);
-		
-		if (target.length >= line.length()) {
-			for (int i = 0; i < line.length(); i++) {
-				target[i] = (byte) line.charAt(i);
-			}
-			return line.length();
-		} else {
-			return -line.length();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
deleted file mode 100644
index d528f5d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class KMeansBroadcast implements Program, ProgramDescription {
-	
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		int parallelism = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String dataPointInput = (args.length > 1 ? args[1] : "");
-		String clusterInput = (args.length > 2 ? args[2] : "");
-		String output = (args.length > 3 ? args[3] : "");
-		int numIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 2);
-
-		// data source data point input
-		@SuppressWarnings("unchecked")
-		FileDataSource pointsSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), dataPointInput, "Data Points");
-
-		// data source for cluster center input
-		@SuppressWarnings("unchecked")
-		FileDataSource clustersSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), clusterInput, "Centers");
-		
-		MapOperator dataPoints = MapOperator.builder(new PointBuilder()).name("Build data points").input(pointsSource).build();
-		
-		MapOperator clusterPoints = MapOperator.builder(new PointBuilder()).name("Build cluster points").input(clustersSource).build();
-		
-		// ---------------------- Begin K-Means Loop ---------------------
-		
-		BulkIteration iter = new BulkIteration("k-means loop");
-		iter.setInput(clusterPoints);
-		iter.setMaximumNumberOfIterations(numIterations);
-
-		// compute the distances and select the closest center
-		MapOperator findNearestClusterCenters = MapOperator.builder(new SelectNearestCenter())
-			.setBroadcastVariable("centers", iter.getPartialSolution())
-			.input(dataPoints)
-			.name("Find Nearest Centers")
-			.build();
-
-		// computing the new cluster positions
-		ReduceOperator recomputeClusterCenter = ReduceOperator.builder(new RecomputeClusterCenter(), IntValue.class, 0)
-			.input(findNearestClusterCenters)
-			.name("Recompute Center Positions")
-			.build();
-		
-		iter.setNextPartialSolution(recomputeClusterCenter);
-
-		// ---------------------- End K-Means Loop ---------------------
-		
-		// create DataSinkContract for writing the new cluster positions
-		FileDataSink newClusterPoints = new FileDataSink(new PointOutFormat(), output, iter, "New Center Positions");
-
-		Plan plan = new Plan(newClusterPoints, "K-Means");
-		plan.setDefaultParallelism(parallelism);
-		return plan;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output> <numIterations>";
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Data Types and UDFs
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * A simple three-dimensional point.
-	 */
-	public static final class Point implements Value {
-		private static final long serialVersionUID = 1L;
-		
-		public double x, y, z;
-		
-		public Point() {}
-
-		public Point(double x, double y, double z) {
-			this.x = x;
-			this.y = y;
-			this.z = z;
-		}
-		
-		public void add(Point other) {
-			x += other.x;
-			y += other.y;
-			z += other.z;
-		}
-		
-		public Point div(long val) {
-			x /= val;
-			y /= val;
-			z /= val;
-			return this;
-		}
-		
-		public double euclideanDistance(Point other) {
-			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y) + (z-other.z)*(z-other.z));
-		}
-		
-		public void clear() {
-			x = y = z = 0.0;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeDouble(x);
-			out.writeDouble(y);
-			out.writeDouble(z);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			x = in.readDouble();
-			y = in.readDouble();
-			z = in.readDouble();
-		}
-		
-		@Override
-		public String toString() {
-			return "(" + x + "|" + y + "|" + z + ")";
-		}
-	}
-	
-	public static final class PointWithId {
-		
-		public int id;
-		public Point point;
-		
-		public PointWithId(int id, Point p) {
-			this.id = id;
-			this.point = p;
-		}
-	}
-	
-	/**
-	 * Determines the closest cluster center for a data point.
-	 */
-	public static final class SelectNearestCenter extends MapFunction {
-		private static final long serialVersionUID = 1L;
-
-		private final IntValue one = new IntValue(1);
-		private final Record result = new Record(3);
-
-		private List<PointWithId> centers = new ArrayList<PointWithId>();
-
-		/**
-		 * Reads all the center values from the broadcast variable into a collection.
-		 */
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			List<Record> clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
-			
-			centers.clear();
-			synchronized (clusterCenters) {
-				for (Record r : clusterCenters) {
-					centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class)));
-				}
-			}
-		}
-
-		/**
-		 * Computes a minimum aggregation on the distance of a data point to cluster centers.
-		 * 
-		 * Output Format:
-		 * 0: centerID
-		 * 1: pointVector
-		 * 2: constant(1) (to enable combinable average computation in the following reducer)
-		 */
-		@Override
-		public void map(Record dataPointRecord, Collector<Record> out) {
-			Point p = dataPointRecord.getField(1, Point.class);
-			
-			double nearestDistance = Double.MAX_VALUE;
-			int centerId = -1;
-
-			// check all cluster centers
-			for (PointWithId center : centers) {
-				// compute distance
-				double distance = p.euclideanDistance(center.point);
-				
-				// update nearest cluster if necessary 
-				if (distance < nearestDistance) {
-					nearestDistance = distance;
-					centerId = center.id;
-				}
-			}
-
-			// emit a new record with the center id and the data point. add a one to ease the
-			// implementation of the average function with a combiner
-			result.setField(0, new IntValue(centerId));
-			result.setField(1, p);
-			result.setField(2, one);
-
-			out.collect(result);
-		}
-	}
-	
-	@Combinable
-	public static final class RecomputeClusterCenter extends ReduceFunction {
-		private static final long serialVersionUID = 1L;
-		
-		private final Point p = new Point();
-		
-		
-		/**
-		 * Compute the new position (coordinate vector) of a cluster center.
-		 */
-		@Override
-		public void reduce(Iterator<Record> points, Collector<Record> out) {
-			Record sum = sumPointsAndCount(points);
-			sum.setField(1, sum.getField(1, Point.class).div(sum.getField(2, IntValue.class).getValue()));
-			out.collect(sum);
-		}
-
-		/**
-		 * Computes a pre-aggregated average value of a coordinate vector.
-		 */
-		@Override
-		public void combine(Iterator<Record> points, Collector<Record> out) {
-			out.collect(sumPointsAndCount(points));
-		}
-		
-		private final Record sumPointsAndCount(Iterator<Record> dataPoints) {
-			Record next = null;
-			p.clear();
-			int count = 0;
-			
-			// compute coordinate vector sum and count
-			while (dataPoints.hasNext()) {
-				next = dataPoints.next();
-				p.add(next.getField(1, Point.class));
-				count += next.getField(2, IntValue.class).getValue();
-			}
-			
-			next.setField(1, p);
-			next.setField(2, new IntValue(count));
-			return next;
-		}
-	}
-	
-	public static final class PointBuilder extends MapFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			double x = record.getField(1, DoubleValue.class).getValue();
-			double y = record.getField(2, DoubleValue.class).getValue();
-			double z = record.getField(3, DoubleValue.class).getValue();
-			
-			record.setField(1, new Point(x, y, z));
-			out.collect(record);
-		}
-	}
-	
-	public static final class PointOutFormat extends FileOutputFormat {
-
-		private static final long serialVersionUID = 1L;
-		
-		private static final String format = "%d|%.1f|%.1f|%.1f|\n";
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			int id = record.getField(0, IntValue.class).getValue();
-			Point p = record.getField(1, Point.class);
-			
-			byte[] bytes = String.format(format, id, p.x, p.y, p.z).getBytes();
-			
-			this.stream.write(bytes);
-		}
-	}
-	
-	public static void main(String[] args) throws Exception {
-		System.out.println(LocalExecutor.optimizerPlanAsJSON(new KMeansBroadcast().getPlan("4", "/dev/random", "/dev/random", "/tmp", "20")));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
deleted file mode 100644
index 8d75d47..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.test.recordJobs.kmeans.udfs.ComputeDistance;
-import org.apache.flink.test.recordJobs.kmeans.udfs.FindNearestCenter;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.RecomputeClusterCenter;
-import org.apache.flink.types.IntValue;
-
-@SuppressWarnings("deprecation")
-public class KMeansCross implements Program, ProgramDescription {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		final int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		final String dataPointInput = (args.length > 1 ? args[1] : "");
-		final String clusterInput = (args.length > 2 ? args[2] : "");
-		final String output = (args.length > 3 ? args[3] : "");
-		final int numIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 1);
-
-		// create DataSourceContract for cluster center input
-		FileDataSource initialClusterPoints = new FileDataSource(new PointInFormat(), clusterInput, "Centers");
-		initialClusterPoints.setParallelism(1);
-		
-		BulkIteration iteration = new BulkIteration("K-Means Loop");
-		iteration.setInput(initialClusterPoints);
-		iteration.setMaximumNumberOfIterations(numIterations);
-		
-		// create DataSourceContract for data point input
-		FileDataSource dataPoints = new FileDataSource(new PointInFormat(), dataPointInput, "Data Points");
-
-		// create CrossOperator for distance computation
-		CrossOperator computeDistance = CrossOperator.builder(new ComputeDistance())
-				.input1(dataPoints)
-				.input2(iteration.getPartialSolution())
-				.name("Compute Distances")
-				.build();
-
-		// create ReduceOperator for finding the nearest cluster centers
-		ReduceOperator findNearestClusterCenters = ReduceOperator.builder(new FindNearestCenter(), IntValue.class, 0)
-				.input(computeDistance)
-				.name("Find Nearest Centers")
-				.build();
-
-		// create ReduceOperator for computing new cluster positions
-		ReduceOperator recomputeClusterCenter = ReduceOperator.builder(new RecomputeClusterCenter(), IntValue.class, 0)
-				.input(findNearestClusterCenters)
-				.name("Recompute Center Positions")
-				.build();
-		iteration.setNextPartialSolution(recomputeClusterCenter);
-		
-		// create DataSourceContract for data point input
-		FileDataSource dataPoints2 = new FileDataSource(new PointInFormat(), dataPointInput, "Data Points 2");
-		
-		// compute distance of points to final clusters 
-		CrossOperator computeFinalDistance = CrossOperator.builder(new ComputeDistance())
-				.input1(dataPoints2)
-				.input2(iteration)
-				.name("Compute Final Distances")
-				.build();
-
-		// find nearest final cluster for point
-		ReduceOperator findNearestFinalCluster = ReduceOperator.builder(new FindNearestCenter(), IntValue.class, 0)
-				.input(computeFinalDistance)
-				.name("Find Nearest Final Centers")
-				.build();
-
-		// create DataSinkContract for writing the new cluster positions
-		FileDataSink finalClusters = new FileDataSink(new PointOutFormat(), output+"/centers", iteration, "Cluster Positions");
-
-		// write assigned clusters
-		FileDataSink clusterAssignments = new FileDataSink(new PointOutFormat(), output+"/points", findNearestFinalCluster, "Cluster Assignments");
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(finalClusters);
-		sinks.add(clusterAssignments);
-		
-		// return the PACT plan
-		Plan plan = new Plan(sinks, "Iterative KMeans");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output> <numIterations>";
-	}
-	
-	public static void main(String[] args) throws Exception {
-		KMeansCross kmi = new KMeansCross();
-		
-		if (args.length < 5) {
-			System.err.println(kmi.getDescription());
-			System.exit(1);
-		}
-		
-		Plan plan = kmi.getPlan(args);
-		
-		// This will execute the kMeans clustering job embedded in a local context.
-		LocalExecutor.execute(plan);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
deleted file mode 100644
index bdf7466..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class KMeansSingleStep implements Program, ProgramDescription {
-	
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String dataPointInput = (args.length > 1 ? args[1] : "");
-		String clusterInput = (args.length > 2 ? args[2] : "");
-		String output = (args.length > 3 ? args[3] : "");
-
-		// create DataSourceContract for data point input
-		@SuppressWarnings("unchecked")
-		FileDataSource pointsSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), dataPointInput, "Data Points");
-
-		// create DataSourceContract for cluster center input
-		@SuppressWarnings("unchecked")
-		FileDataSource clustersSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), clusterInput, "Centers");
-		
-		MapOperator dataPoints = MapOperator.builder(new PointBuilder()).name("Build data points").input(pointsSource).build();
-		
-		MapOperator clusterPoints = MapOperator.builder(new PointBuilder()).name("Build cluster points").input(clustersSource).build();
-
-		// the mapper computes the distance to all points, which it draws from a broadcast variable
-		MapOperator findNearestClusterCenters = MapOperator.builder(new SelectNearestCenter())
-			.setBroadcastVariable("centers", clusterPoints)
-			.input(dataPoints)
-			.name("Find Nearest Centers")
-			.build();
-
-		// create reducer recomputes the cluster centers as the  average of all associated data points
-		ReduceOperator recomputeClusterCenter = ReduceOperator.builder(new RecomputeClusterCenter(), IntValue.class, 0)
-			.input(findNearestClusterCenters)
-			.name("Recompute Center Positions")
-			.build();
-
-		// create DataSinkContract for writing the new cluster positions
-		FileDataSink newClusterPoints = new FileDataSink(new PointOutFormat(), output, recomputeClusterCenter, "New Center Positions");
-
-		// return the plan
-		Plan plan = new Plan(newClusterPoints, "KMeans Iteration");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output>";
-	}
-	
-	public static final class Point implements Value {
-		private static final long serialVersionUID = 1L;
-		
-		public double x, y, z;
-		
-		public Point() {}
-
-		public Point(double x, double y, double z) {
-			this.x = x;
-			this.y = y;
-			this.z = z;
-		}
-		
-		public void add(Point other) {
-			x += other.x;
-			y += other.y;
-			z += other.z;
-		}
-		
-		public Point div(long val) {
-			x /= val;
-			y /= val;
-			z /= val;
-			return this;
-		}
-		
-		public double euclideanDistance(Point other) {
-			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y) + (z-other.z)*(z-other.z));
-		}
-		
-		public void clear() {
-			x = y = z = 0.0;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeDouble(x);
-			out.writeDouble(y);
-			out.writeDouble(z);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			x = in.readDouble();
-			y = in.readDouble();
-			z = in.readDouble();
-		}
-		
-		@Override
-		public String toString() {
-			return "(" + x + "|" + y + "|" + z + ")";
-		}
-	}
-	
-	public static final class PointWithId {
-		
-		public int id;
-		public Point point;
-		
-		public PointWithId(int id, Point p) {
-			this.id = id;
-			this.point = p;
-		}
-	}
-	
-	/**
-	 * Determines the closest cluster center for a data point.
-	 */
-	public static final class SelectNearestCenter extends MapFunction {
-		private static final long serialVersionUID = 1L;
-
-		private final IntValue one = new IntValue(1);
-		private final Record result = new Record(3);
-
-		private List<PointWithId> centers = new ArrayList<PointWithId>();
-
-		/**
-		 * Reads all the center values from the broadcast variable into a collection.
-		 */
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			Collection<Record> clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
-			
-			centers.clear();
-			for (Record r : clusterCenters) {
-				centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class)));
-			}
-		}
-
-		/**
-		 * Computes a minimum aggregation on the distance of a data point to cluster centers.
-		 * 
-		 * Output Format:
-		 * 0: centerID
-		 * 1: pointVector
-		 * 2: constant(1) (to enable combinable average computation in the following reducer)
-		 */
-		@Override
-		public void map(Record dataPointRecord, Collector<Record> out) {
-			Point p = dataPointRecord.getField(1, Point.class);
-			
-			double nearestDistance = Double.MAX_VALUE;
-			int centerId = -1;
-
-			// check all cluster centers
-			for (PointWithId center : centers) {
-				// compute distance
-				double distance = p.euclideanDistance(center.point);
-				
-				// update nearest cluster if necessary 
-				if (distance < nearestDistance) {
-					nearestDistance = distance;
-					centerId = center.id;
-				}
-			}
-
-			// emit a new record with the center id and the data point. add a one to ease the
-			// implementation of the average function with a combiner
-			result.setField(0, new IntValue(centerId));
-			result.setField(1, p);
-			result.setField(2, one);
-
-			out.collect(result);
-		}
-	}
-	
-	@Combinable
-	public static final class RecomputeClusterCenter extends ReduceFunction {
-		private static final long serialVersionUID = 1L;
-		
-		private final Point p = new Point();
-		
-		
-		/**
-		 * Compute the new position (coordinate vector) of a cluster center.
-		 */
-		@Override
-		public void reduce(Iterator<Record> points, Collector<Record> out) {
-			Record sum = sumPointsAndCount(points);
-			sum.setField(1, sum.getField(1, Point.class).div(sum.getField(2, IntValue.class).getValue()));
-			out.collect(sum);
-		}
-
-		/**
-		 * Computes a pre-aggregated average value of a coordinate vector.
-		 */
-		@Override
-		public void combine(Iterator<Record> points, Collector<Record> out) {
-			out.collect(sumPointsAndCount(points));
-		}
-		
-		private final Record sumPointsAndCount(Iterator<Record> dataPoints) {
-			Record next = null;
-			p.clear();
-			int count = 0;
-			
-			// compute coordinate vector sum and count
-			while (dataPoints.hasNext()) {
-				next = dataPoints.next();
-				p.add(next.getField(1, Point.class));
-				count += next.getField(2, IntValue.class).getValue();
-			}
-			
-			next.setField(1, p);
-			next.setField(2, new IntValue(count));
-			return next;
-		}
-	}
-	
-	public static final class PointBuilder extends MapFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			double x = record.getField(1, DoubleValue.class).getValue();
-			double y = record.getField(2, DoubleValue.class).getValue();
-			double z = record.getField(3, DoubleValue.class).getValue();
-			
-			record.setField(1, new Point(x, y, z));
-			out.collect(record);
-		}
-	}
-	
-	public static final class PointOutFormat extends FileOutputFormat {
-
-		private static final long serialVersionUID = 1L;
-		
-		private static final String format = "%d|%.1f|%.1f|%.1f|\n";
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			int id = record.getField(0, IntValue.class).getValue();
-			Point p = record.getField(1, Point.class);
-			
-			byte[] bytes = String.format(format, id, p.x, p.y, p.z).getBytes();
-			
-			this.stream.write(bytes);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
deleted file mode 100644
index ee33113..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-/**
- * Cross PACT computes the distance of all data points to all cluster
- * centers.
- */
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirst({0,1})
-public class ComputeDistance extends CrossFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	private final DoubleValue distance = new DoubleValue();
-	
-	/**
-	 * Computes the distance of one data point to one cluster center.
-	 * 
-	 * Output Format:
-	 * 0: pointID
-	 * 1: pointVector
-	 * 2: clusterID
-	 * 3: distance
-	 */
-	@Override
-	public Record cross(Record dataPointRecord, Record clusterCenterRecord) throws Exception {
-
-		CoordVector dataPoint = dataPointRecord.getField(1, CoordVector.class);
-
-		IntValue clusterCenterId = clusterCenterRecord.getField(0, IntValue.class);
-		CoordVector clusterPoint = clusterCenterRecord.getField(1, CoordVector.class);
-
-		this.distance.setValue(dataPoint.computeEuclidianDistance(clusterPoint));
-
-		// add cluster center id and distance to the data point record
-		dataPointRecord.setField(2, clusterCenterId);
-		dataPointRecord.setField(3, this.distance);
-
-		return dataPointRecord;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
deleted file mode 100644
index 78b60ef..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * Cross PACT computes the distance of all data points to all cluster
- * centers.
- */
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirst({0,1})
-public class ComputeDistanceParameterized extends MapFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	private final DoubleValue distance = new DoubleValue();
-	
-	private Collection<Record> clusterCenters;
-	
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		this.clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
-	}
-	
-	/**
-	 * Computes the distance of one data point to one cluster center.
-	 * 
-	 * Output Format:
-	 * 0: pointID
-	 * 1: pointVector
-	 * 2: clusterID
-	 * 3: distance
-	 */
-	@Override
-	public void map(Record dataPointRecord, Collector<Record> out) {
-		
-		CoordVector dataPoint = dataPointRecord.getField(1, CoordVector.class);
-		
-		for (Record clusterCenterRecord : this.clusterCenters) {
-			IntValue clusterCenterId = clusterCenterRecord.getField(0, IntValue.class);
-			CoordVector clusterPoint = clusterCenterRecord.getField(1, CoordVector.class);
-		
-			this.distance.setValue(dataPoint.computeEuclidianDistance(clusterPoint));
-			
-			// add cluster center id and distance to the data point record 
-			dataPointRecord.setField(2, clusterCenterId);
-			dataPointRecord.setField(3, this.distance);
-			
-			out.collect(dataPointRecord);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/CoordVector.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/CoordVector.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/CoordVector.java
deleted file mode 100644
index 67e87a3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/CoordVector.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-
-/**
- * Implements a feature vector as a multi-dimensional point. Coordinates of that point
- * (= the features) are stored as double values. The distance between two feature vectors is
- * the Euclidian distance between the points.
- */
-public final class CoordVector implements Key<CoordVector> {
-	private static final long serialVersionUID = 1L;
-	
-	// coordinate array
-	private double[] coordinates;
-
-	/**
-	 * Initializes a blank coordinate vector. Required for deserialization!
-	 */
-	public CoordVector() {
-		coordinates = null;
-	}
-
-	/**
-	 * Initializes a coordinate vector.
-	 * 
-	 * @param coordinates The coordinate vector of a multi-dimensional point.
-	 */
-	public CoordVector(Double[] coordinates) {
-		this.coordinates = new double[coordinates.length];
-		for (int i = 0; i < coordinates.length; i++) {
-			this.coordinates[i] = coordinates[i];
-		}
-	}
-
-	/**
-	 * Initializes a coordinate vector.
-	 * 
-	 * @param coordinates The coordinate vector of a multi-dimensional point.
-	 */
-	public CoordVector(double[] coordinates) {
-		this.coordinates = coordinates;
-	}
-
-	/**
-	 * Returns the coordinate vector of a multi-dimensional point.
-	 * 
-	 * @return The coordinate vector of a multi-dimensional point.
-	 */
-	public double[] getCoordinates() {
-		return this.coordinates;
-	}
-	
-	/**
-	 * Sets the coordinate vector of a multi-dimensional point.
-	 * 
-	 * @param coordinates The dimension values of the point.
-	 */
-	public void setCoordinates(double[] coordinates) {
-		this.coordinates = coordinates;
-	}
-
-	/**
-	 * Computes the Euclidian distance between this coordinate vector and a
-	 * second coordinate vector.
-	 * 
-	 * @param cv The coordinate vector to which the distance is computed.
-	 * @return The Euclidian distance to coordinate vector cv. If cv has a
-	 *         different length than this coordinate vector, -1 is returned.
-	 */
-	public double computeEuclidianDistance(CoordVector cv) {
-		// check coordinate vector lengths
-		if (cv.coordinates.length != this.coordinates.length) {
-			return -1.0;
-		}
-
-		double quadSum = 0.0;
-		for (int i = 0; i < this.coordinates.length; i++) {
-			double diff = this.coordinates[i] - cv.coordinates[i];
-			quadSum += diff*diff;
-		}
-		return Math.sqrt(quadSum);
-	}
-
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		int length = in.readInt();
-		this.coordinates = new double[length];
-		for (int i = 0; i < length; i++) {
-			this.coordinates[i] = in.readDouble();
-		}
-	}
-
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.coordinates.length);
-		for (int i = 0; i < this.coordinates.length; i++) {
-			out.writeDouble(this.coordinates[i]);
-		}
-	}
-
-	/**
-	 * Compares this coordinate vector to another key.
-	 * 
-	 * @return -1 if the other key is not of type CoordVector. If the other
-	 *         key is also a CoordVector but its length differs from this
-	 *         coordinates vector, -1 is return if this coordinate vector is
-	 *         smaller and 1 if it is larger. If both coordinate vectors
-	 *         have the same length, the coordinates of both are compared.
-	 *         If a coordinate of this coordinate vector is smaller than the
-	 *         corresponding coordinate of the other vector -1 is returned
-	 *         and 1 otherwise. If all coordinates are identical 0 is
-	 *         returned.
-	 */
-	@Override
-	public int compareTo(CoordVector o) {
-		// check if both coordinate vectors have identical lengths
-		if (o.coordinates.length > this.coordinates.length) {
-			return -1;
-		}
-		else if (o.coordinates.length < this.coordinates.length) {
-			return 1;
-		}
-
-		// compare all coordinates
-		for (int i = 0; i < this.coordinates.length; i++) {
-			if (o.coordinates[i] > this.coordinates[i]) {
-				return -1;
-			} else if (o.coordinates[i] < this.coordinates[i]) {
-				return 1;
-			}
-		}
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
deleted file mode 100644
index 1e893ce..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * Reduce PACT determines the closes cluster center for a data point. This
- * is a minimum aggregation. Hence, a Combiner can be easily implemented.
- */
-@SuppressWarnings("deprecation")
-@Combinable
-@ConstantFields(1)
-public class FindNearestCenter extends ReduceFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	private final IntValue centerId = new IntValue();
-	private final CoordVector position = new CoordVector();
-	private final IntValue one = new IntValue(1);
-	
-	private final Record result = new Record(3);
-	
-	/**
-	 * Computes a minimum aggregation on the distance of a data point to
-	 * cluster centers. 
-	 * 
-	 * Output Format:
-	 * 0: centerID
-	 * 1: pointVector
-	 * 2: constant(1) (to enable combinable average computation in the following reducer)
-	 */
-	@Override
-	public void reduce(Iterator<Record> pointsWithDistance, Collector<Record> out) {
-		double nearestDistance = Double.MAX_VALUE;
-		int nearestClusterId = 0;
-
-		// check all cluster centers
-		while (pointsWithDistance.hasNext()) {
-			Record res = pointsWithDistance.next();
-			
-			double distance = res.getField(3, DoubleValue.class).getValue();
-
-			// compare distances
-			if (distance < nearestDistance) {
-				// if distance is smaller than smallest till now, update nearest cluster
-				nearestDistance = distance;
-				nearestClusterId = res.getField(2, IntValue.class).getValue();
-				res.getFieldInto(1, this.position);
-			}
-		}
-
-		// emit a new record with the center id and the data point. add a one to ease the
-		// implementation of the average function with a combiner
-		this.centerId.setValue(nearestClusterId);
-		this.result.setField(0, this.centerId);
-		this.result.setField(1, this.position);
-		this.result.setField(2, this.one);
-			
-		out.collect(this.result);
-	}
-
-	// ----------------------------------------------------------------------------------------
-	
-	private final Record nearest = new Record();
-	
-	/**
-	 * Computes a minimum aggregation on the distance of a data point to
-	 * cluster centers.
-	 */
-	@Override
-	public void combine(Iterator<Record> pointsWithDistance, Collector<Record> out) {
-		double nearestDistance = Double.MAX_VALUE;
-
-		// check all cluster centers
-		while (pointsWithDistance.hasNext()) {
-			Record res = pointsWithDistance.next();
-			double distance = res.getField(3, DoubleValue.class).getValue();
-
-			// compare distances
-			if (distance < nearestDistance) {
-				nearestDistance = distance;
-				res.copyTo(this.nearest);
-			}
-		}
-
-		// emit nearest one
-		out.collect(this.nearest);
-	}
-}