You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/24 18:17:55 UTC
[12/16] flink git commit: [FLINK-2901] Remove Record API dependencies
from flink-tests #2
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
deleted file mode 100644
index dc52158..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/EnumTrianglesWithDegrees.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.graph.ComputeEdgeDegrees.CountEdges;
-import org.apache.flink.test.recordJobs.graph.ComputeEdgeDegrees.JoinCountsAndUniquify;
-import org.apache.flink.test.recordJobs.graph.ComputeEdgeDegrees.ProjectEdge;
-import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees.BuildTriads;
-import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees.CloseTriads;
-import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees.ProjectOutCounts;
-import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees.ProjectToLowerDegreeVertex;
-import org.apache.flink.test.recordJobs.graph.triangleEnumUtil.EdgeInputFormat;
-import org.apache.flink.test.recordJobs.graph.triangleEnumUtil.TriangleOutputFormat;
-import org.apache.flink.types.IntValue;
-
-/**
- * An implementation of the triangle enumeration, which includes the pre-processing step
- * to compute the degrees of the vertices and to select the lower-degree vertex for the
- * enumeration of open triads.
- */
-@SuppressWarnings("deprecation")
-public class EnumTrianglesWithDegrees implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- final int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- final String edgeInput = args.length > 1 ? args[1] : "";
- final String output = args.length > 2 ? args[2] : "";
- final char delimiter = args.length > 3 ? (char) Integer.parseInt(args[3]) : ',';
-
-
- FileDataSource edges = new FileDataSource(new EdgeInputFormat(), edgeInput, "Input Edges");
- edges.setParameter(EdgeInputFormat.ID_DELIMITER_CHAR, delimiter);
-
- // =========================== Vertex Degree ============================
-
- MapOperator projectEdge = MapOperator.builder(new ProjectEdge())
- .input(edges).name("Project Edge").build();
-
- ReduceOperator edgeCounter = ReduceOperator.builder(new CountEdges(), IntValue.class, 0)
- .input(projectEdge).name("Count Edges for Vertex").build();
-
- ReduceOperator countJoiner = ReduceOperator.builder(new JoinCountsAndUniquify(), IntValue.class, 0)
- .keyField(IntValue.class, 1)
- .input(edgeCounter).name("Join Counts").build();
-
-
- // =========================== Triangle Enumeration ============================
-
- MapOperator toLowerDegreeEdge = MapOperator.builder(new ProjectToLowerDegreeVertex())
- .input(countJoiner).name("Select lower-degree Edge").build();
-
- MapOperator projectOutCounts = MapOperator.builder(new ProjectOutCounts())
- .input(countJoiner).name("Project out Counts").build();
-
- ReduceOperator buildTriads = ReduceOperator.builder(new BuildTriads(), IntValue.class, 0)
- .input(toLowerDegreeEdge).name("Build Triads").build();
-
- JoinOperator closeTriads = JoinOperator.builder(new CloseTriads(), IntValue.class, 1, 0)
- .keyField(IntValue.class, 2, 1)
- .input1(buildTriads).input2(projectOutCounts)
- .name("Close Triads").build();
- closeTriads.setParameter("INPUT_SHIP_STRATEGY", "SHIP_REPARTITION_HASH");
- closeTriads.setParameter("LOCAL_STRATEGY", "LOCAL_STRATEGY_HASH_BUILD_SECOND");
-
- FileDataSink triangles = new FileDataSink(new TriangleOutputFormat(), output, closeTriads, "Triangles");
-
- Plan p = new Plan(triangles, "Enumerate Triangles");
- p.setDefaultParallelism(numSubTasks);
- return p;
- }
-
- @Override
- public String getDescription() {
- return "Parameters: [noSubStasks] [input file] [output file]";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
deleted file mode 100644
index 34d4b60..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/PairwiseSP.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-/**
- * Implementation of the Pairwise Shortest Path example PACT program.
- * The program implements one iteration of the algorithm and must be run multiple times until no changes are computed.
- *
- * The pairwise shortest path algorithm comes from the domain graph problems. The goal is to find all shortest paths
- * between any two transitively connected nodes in a graph. In this implementation edges are interpreted as directed and weighted.
- *
- * For the first iteration, the program allows two input formats:
- * 1) RDF triples with foaf:knows predicates. A triple is interpreted as an edge from the RDF subject to the RDF object with weight 1.
- * 2) The programs text-serialization for paths (see @see PathInFormat and @see PathOutFormat).
- *
- * The RDF input format is used if the 4th parameter of the getPlan() method is set to "true". If set to "false" the path input format is used.
- */
-@SuppressWarnings("deprecation")
-public class PairwiseSP implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Reads RDF triples and filters on the foaf:knows RDF predicate. The triples elements must be separated by whitespaces.
- * The foaf:knows RDF predicate indicates that the RDF subject knows the object (typically of type foaf:person).
- * The connections between people are extracted and handles as graph edges. For the Pairwise Shortest Path algorithm the
- * connection is interpreted as a directed edge, i.e. subject knows object, but the object does not necessarily know the subject.
- *
- * The RDFTripleInFormat filters all RDF triples with foaf:knows predicates.
- * For each triple with foaf:knows predicate, a record is emitted with
- * - from-node being the RDF subject at field position 0,
- * - to-node being the RDF object at field position 1,
- * - length being 1 at field position 2, and
- * - hopList being an empty string at field position 3.
- *
- */
- public static class RDFTripleInFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringValue fromNode = new StringValue();
- private final StringValue toNode = new StringValue();
- private final IntValue pathLength = new IntValue(1);
- private final IntValue hopCnt = new IntValue(0);
- private final StringValue hopList = new StringValue(" ");
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- String lineStr = new String(bytes, offset, numBytes);
- // replace reduce whitespaces and trim
- lineStr = lineStr.replaceAll("\\s+", " ").trim();
- // build whitespace tokenizer
- StringTokenizer st = new StringTokenizer(lineStr, " ");
-
- // line must have at least three elements
- if (st.countTokens() < 3) {
- return null;
- }
-
- String rdfSubj = st.nextToken();
- String rdfPred = st.nextToken();
- String rdfObj = st.nextToken();
-
- // we only want foaf:knows predicates
- if (!rdfPred.equals("<http://xmlns.com/foaf/0.1/knows>")) {
- return null;
- }
-
- // build node pair from subject and object
- fromNode.setValue(rdfSubj);
- toNode.setValue(rdfObj);
-
- target.setField(0, fromNode);
- target.setField(1, toNode);
- target.setField(2, pathLength);
- target.setField(3, hopCnt);
- target.setField(4, hopList);
-
- return target;
- }
- }
-
- /**
- * The PathInFormat reads paths consisting of a from-node a to-node, a length, and hop node list serialized as a string.
- * All four elements of the path must be separated by the pipe character ('|') and may not contain any pipe characters itself.
- *
- * PathInFormat returns records with:
- * - from-node at field position 0,
- * - to-node at field position 1,
- * - length at field position 2,
- * - hop list at field position 3.
- */
- public static class PathInFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringValue fromNode = new StringValue();
- private final StringValue toNode = new StringValue();
- private final IntValue length = new IntValue();
- private final IntValue hopCnt = new IntValue();
- private final StringValue hopList = new StringValue();
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- String lineStr = new String(bytes, offset, numBytes);
- StringTokenizer st = new StringTokenizer(lineStr, "|");
-
- // path must have exactly 5 tokens (fromNode, toNode, length, hopCnt, hopList)
- if (st.countTokens() != 5) {
- return null;
- }
-
- this.fromNode.setValue(st.nextToken());
- this.toNode.setValue(st.nextToken());
- this.length.setValue(Integer.parseInt(st.nextToken()));
- this.hopCnt.setValue(Integer.parseInt(st.nextToken()));
- this.hopList.setValue(st.nextToken());
-
- target.setField(0, fromNode);
- target.setField(1, toNode);
- target.setField(2, length);
- target.setField(3, hopCnt);
- target.setField(4, hopList);
-
- return target;
- }
- }
-
- /**
- * The PathOutFormat serializes paths to text.
- * In order, the from-node, the to-node, the length, the hop list are written out.
- * Elements are separated by the pipe character ('|').
- *
- *
- */
- public static class PathOutFormat extends FileOutputFormat {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void writeRecord(Record record) throws IOException {
- StringBuilder line = new StringBuilder();
-
- // append from-node
- line.append(record.getField(0, StringValue.class).toString());
- line.append("|");
- // append to-node
- line.append(record.getField(1, StringValue.class).toString());
- line.append("|");
- // append length
- line.append(record.getField(2, IntValue.class).toString());
- line.append("|");
- // append hopCnt
- line.append(record.getField(3, IntValue.class).toString());
- line.append("|");
- // append hopList
- line.append(record.getField(4, StringValue.class).toString());
- line.append("|");
- line.append("\n");
-
- stream.write(line.toString().getBytes());
- }
- }
-
- /**
- * Concatenates two paths where the from-node of the first path and the to-node of the second path are the same.
- * The second input path becomes the first part and the first input path the second part of the output path.
- * The length of the output path is the sum of both input paths.
- * The output path's hops list is built from both path's hops lists and the common node.
- */
- @ConstantFieldsFirst(1)
- @ConstantFieldsSecond(0)
- public static class ConcatPaths extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Record outputRecord = new Record();
-
- private final IntValue length = new IntValue();
- private final IntValue hopCnt = new IntValue();
- private final StringValue hopList = new StringValue();
-
- @Override
- public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
-
- // rec1 has matching start, rec2 matching end
- // Therefore, rec2's end node and rec1's start node are identical
- // First half of new path will be rec2, second half will be rec1
-
- // Get from-node and to-node of new path
- final StringValue fromNode = rec2.getField(0, StringValue.class);
- final StringValue toNode = rec1.getField(1, StringValue.class);
-
- // Check whether from-node = to-node to prevent circles!
- if (fromNode.equals(toNode)) {
- return;
- }
-
- // Create new path
- outputRecord.setField(0, fromNode);
- outputRecord.setField(1, toNode);
-
- // Compute length of new path
- length.setValue(rec1.getField(2, IntValue.class).getValue() + rec2.getField(2, IntValue.class).getValue());
- outputRecord.setField(2, length);
-
- // compute hop count
- int hops = rec1.getField(3, IntValue.class).getValue() + 1 + rec2.getField(3, IntValue.class).getValue();
- hopCnt.setValue(hops);
- outputRecord.setField(3, hopCnt);
-
- // Concatenate hops lists and insert matching node
- StringBuilder sb = new StringBuilder();
- // first path
- sb.append(rec2.getField(4, StringValue.class).getValue());
- sb.append(" ");
- // common node
- sb.append(rec1.getField(0, StringValue.class).getValue());
- // second path
- sb.append(" ");
- sb.append(rec1.getField(4, StringValue.class).getValue());
-
- hopList.setValue(sb.toString().trim());
- outputRecord.setField(4, hopList);
-
- out.collect(outputRecord);
- }
- }
-
- /**
- * Gets two lists of paths as input and emits for each included from-node/to-node combination the shortest path(s).
- * If for a combination more than one shortest path exists, all shortest paths are emitted.
- *
- *
- */
- @ConstantFieldsFirst({0,1})
- @ConstantFieldsSecond({0,1})
- public static class FindShortestPath extends CoGroupFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Record outputRecord = new Record();
-
- private final Set<StringValue> shortestPaths = new HashSet<StringValue>();
- private final Map<StringValue,IntValue> hopCnts = new HashMap<StringValue,IntValue>();
- private final IntValue minLength = new IntValue();
-
- @Override
- public void coGroup(Iterator<Record> inputRecords, Iterator<Record> concatRecords, Collector<Record> out) {
-
- // init minimum length and minimum path
- Record pathRec = null;
- StringValue path = null;
- if(inputRecords.hasNext()) {
- // path is in input paths
- pathRec = inputRecords.next();
- } else {
- // path must be in concat paths
- pathRec = concatRecords.next();
- }
- // get from node (common for all paths)
- StringValue fromNode = pathRec.getField(0, StringValue.class);
- // get to node (common for all paths)
- StringValue toNode = pathRec.getField(1, StringValue.class);
- // get length of path
- minLength.setValue(pathRec.getField(2, IntValue.class).getValue());
- // store path and hop count
- path = new StringValue(pathRec.getField(4, StringValue.class));
- shortestPaths.add(path);
- hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
-
- // find shortest path of all input paths
- while (inputRecords.hasNext()) {
- pathRec = inputRecords.next();
- IntValue length = pathRec.getField(2, IntValue.class);
-
- if (length.getValue() == minLength.getValue()) {
- // path has also minimum length add to list
- path = new StringValue(pathRec.getField(4, StringValue.class));
- if(shortestPaths.add(path)) {
- hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
- }
- } else if (length.getValue() < minLength.getValue()) {
- // path has minimum length
- minLength.setValue(length.getValue());
- // clear lists
- hopCnts.clear();
- shortestPaths.clear();
- // get path and add path and hop count
- path = new StringValue(pathRec.getField(4, StringValue.class));
- shortestPaths.add(path);
- hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
- }
- }
-
- // find shortest path of all input and concatenated paths
- while (concatRecords.hasNext()) {
- pathRec = concatRecords.next();
- IntValue length = pathRec.getField(2, IntValue.class);
-
- if (length.getValue() == minLength.getValue()) {
- // path has also minimum length add to list
- path = new StringValue(pathRec.getField(4, StringValue.class));
- if(shortestPaths.add(path)) {
- hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
- }
- } else if (length.getValue() < minLength.getValue()) {
- // path has minimum length
- minLength.setValue(length.getValue());
- // clear lists
- hopCnts.clear();
- shortestPaths.clear();
- // get path and add path and hop count
- path = new StringValue(pathRec.getField(4, StringValue.class));
- shortestPaths.add(path);
- hopCnts.put(path, new IntValue(pathRec.getField(3, IntValue.class).getValue()));
- }
- }
-
- outputRecord.setField(0, fromNode);
- outputRecord.setField(1, toNode);
- outputRecord.setField(2, minLength);
-
- // emit all shortest paths
- for(StringValue shortestPath : shortestPaths) {
- outputRecord.setField(3, hopCnts.get(shortestPath));
- outputRecord.setField(4, shortestPath);
- out.collect(outputRecord);
- }
-
- hopCnts.clear();
- shortestPaths.clear();
-
- }
- }
-
- /**
- * Assembles the Plan of the Pairwise Shortest Paths example Pact program.
- * The program computes one iteration of the Pairwise Shortest Paths algorithm.
- *
- * For the first iteration, two input formats can be chosen:
- * 1) RDF triples with foaf:knows predicates
- * 2) Text-serialized paths (see PathInFormat and PathOutFormat)
- *
- * To choose 1) set the forth parameter to "true". If set to "false" 2) will be used.
- *
- */
- @Override
- public Plan getPlan(String... args) {
-
- // parse job parameters
- int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- String paths = (args.length > 1 ? args[1] : "");
- String output = (args.length > 2 ? args[2] : "");
- boolean rdfInput = (args.length > 3 && Boolean.parseBoolean(args[3]));
-
- FileDataSource pathsInput;
-
- if(rdfInput) {
- pathsInput = new FileDataSource(new RDFTripleInFormat(), paths, "RDF Triples");
- } else {
- pathsInput = new FileDataSource(new PathInFormat(), paths, "Paths");
- }
- pathsInput.setParallelism(numSubTasks);
-
- JoinOperator concatPaths =
- JoinOperator.builder(new ConcatPaths(), StringValue.class, 0, 1)
- .name("Concat Paths")
- .build();
-
- concatPaths.setParallelism(numSubTasks);
-
- CoGroupOperator findShortestPaths =
- CoGroupOperator.builder(new FindShortestPath(), StringValue.class, 0, 0)
- .keyField(StringValue.class, 1, 1)
- .name("Find Shortest Paths")
- .build();
- findShortestPaths.setParallelism(numSubTasks);
-
- FileDataSink result = new FileDataSink(new PathOutFormat(),output, "New Paths");
- result.setParallelism(numSubTasks);
-
- result.setInput(findShortestPaths);
- findShortestPaths.setFirstInput(pathsInput);
- findShortestPaths.setSecondInput(concatPaths);
- concatPaths.setFirstInput(pathsInput);
- concatPaths.setSecondInput(pathsInput);
-
- return new Plan(result, "Pairwise Shortest Paths");
-
- }
-
- @Override
- public String getDescription() {
- return "Parameters: [numSubStasks], [inputPaths], [outputPaths], [RDFInputFlag]";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
deleted file mode 100644
index 0dbb20a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class WorksetConnectedComponents implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
- public static final class DuplicateLongMap extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- record.setField(1, record.getField(0, LongValue.class));
- out.collect(record);
- }
- }
-
- /**
- * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
- * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
- * produces a (Target-vertex-ID, Component-ID) pair.
- */
- public static final class NeighborWithComponentIDJoin extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Record result = new Record();
-
- @Override
- public void join(Record vertexWithComponent, Record edge, Collector<Record> out) {
- this.result.setField(0, edge.getField(1, LongValue.class));
- this.result.setField(1, vertexWithComponent.getField(1, LongValue.class));
- out.collect(this.result);
- }
- }
-
- /**
- * Minimum aggregation over (Vertex-ID, Component-ID) pairs, selecting the pair with the smallest Component-ID.
- */
- @Combinable
- @ConstantFields(0)
- public static final class MinimumComponentIDReduce extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Record result = new Record();
- private final LongValue vertexId = new LongValue();
- private final LongValue minComponentId = new LongValue();
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) {
-
- final Record first = records.next();
- final long vertexID = first.getField(0, LongValue.class).getValue();
-
- long minimumComponentID = first.getField(1, LongValue.class).getValue();
-
- while (records.hasNext()) {
- long candidateComponentID = records.next().getField(1, LongValue.class).getValue();
- if (candidateComponentID < minimumComponentID) {
- minimumComponentID = candidateComponentID;
- }
- }
-
- this.vertexId.setValue(vertexID);
- this.minComponentId.setValue(minimumComponentID);
- this.result.setField(0, this.vertexId);
- this.result.setField(1, this.minComponentId);
- out.collect(this.result);
- }
- }
-
- /**
- * UDF that joins a candidate (Vertex-ID, Component-ID) pair with another (Vertex-ID, Component-ID) pair.
- * Returns the candidate pair, if the candidate's Component-ID is smaller.
- */
- @ConstantFieldsFirst(0)
- public static final class UpdateComponentIdMatch extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record newVertexWithComponent, Record currentVertexWithComponent, Collector<Record> out){
-
- long candidateComponentID = newVertexWithComponent.getField(1, LongValue.class).getValue();
- long currentComponentID = currentVertexWithComponent.getField(1, LongValue.class).getValue();
-
- if (candidateComponentID < currentComponentID) {
- out.collect(newVertexWithComponent);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- final int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- final String verticesInput = (args.length > 1 ? args[1] : "");
- final String edgeInput = (args.length > 2 ? args[2] : "");
- final String output = (args.length > 3 ? args[3] : "");
- final int maxIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 1);
-
- // data source for initial vertices
- FileDataSource initialVertices = new FileDataSource(new CsvInputFormat(' ', LongValue.class), verticesInput, "Vertices");
-
- MapOperator verticesWithId = MapOperator.builder(DuplicateLongMap.class).input(initialVertices).name("Assign Vertex Ids").build();
-
- // the loop takes the vertices as the solution set and changed vertices as the workset
- // initially, all vertices are changed
- DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
- iteration.setInitialSolutionSet(verticesWithId);
- iteration.setInitialWorkset(verticesWithId);
- iteration.setMaximumNumberOfIterations(maxIterations);
-
- // data source for the edges
- FileDataSource edges = new FileDataSource(new CsvInputFormat(' ', LongValue.class, LongValue.class), edgeInput, "Edges");
-
- // join workset (changed vertices) with the edges to propagate changes to neighbors
- JoinOperator joinWithNeighbors = JoinOperator.builder(new NeighborWithComponentIDJoin(), LongValue.class, 0, 0)
- .input1(iteration.getWorkset())
- .input2(edges)
- .name("Join Candidate Id With Neighbor")
- .build();
-
- // find for each neighbor the smallest of all candidates
- ReduceOperator minCandidateId = ReduceOperator.builder(new MinimumComponentIDReduce(), LongValue.class, 0)
- .input(joinWithNeighbors)
- .name("Find Minimum Candidate Id")
- .build();
-
- // join candidates with the solution set and update if the candidate component-id is smaller
- JoinOperator updateComponentId = JoinOperator.builder(new UpdateComponentIdMatch(), LongValue.class, 0, 0)
- .input1(minCandidateId)
- .input2(iteration.getSolutionSet())
- .name("Update Component Id")
- .build();
-
- iteration.setNextWorkset(updateComponentId);
- iteration.setSolutionSetDelta(updateComponentId);
-
- // sink is the iteration result
- FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, iteration, "Result");
- CsvOutputFormat.configureRecordFormat(result)
- .recordDelimiter('\n')
- .fieldDelimiter(' ')
- .field(LongValue.class, 0)
- .field(LongValue.class, 1);
-
- Plan plan = new Plan(result, "Workset Connected Components");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
- }
-
- @Override
- public String getDescription() {
- return "Parameters: <numberOfSubTasks> <vertices> <edges> <out> <maxIterations>";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeInputFormat.java
deleted file mode 100644
index 81f4d00..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeInputFormat.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.triangleEnumUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-
-/**
- *
- */
-public final class EdgeInputFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- public static final String ID_DELIMITER_CHAR = "edgeinput.delimiter";
-
- private final IntValue i1 = new IntValue();
- private final IntValue i2 = new IntValue();
-
- private char delimiter;
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- final int limit = offset + numBytes;
- int first = 0, second = 0;
- final char delimiter = this.delimiter;
-
- int pos = offset;
- while (pos < limit && bytes[pos] != delimiter) {
- first = first * 10 + (bytes[pos++] - '0');
- }
- pos += 1;// skip the delimiter
- while (pos < limit) {
- second = second * 10 + (bytes[pos++] - '0');
- }
-
- if (first <= 0 || second <= 0 || first == second) {
- return null;
- }
-
- this.i1.setValue(first);
- this.i2.setValue(second);
- target.setField(0, this.i1);
- target.setField(1, this.i2);
- return target;
- }
-
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public void configure(Configuration parameters) {
- super.configure(parameters);
- this.delimiter = (char) parameters.getInteger(ID_DELIMITER_CHAR, ',');
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesInputFormat.java
deleted file mode 100644
index 8441602..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesInputFormat.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.triangleEnumUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-
-/**
- * Input format that reads edges augmented with vertex degrees. The data to be read is assumed to be in
- * the format <code>v1,d1|v2,d2\n</code>, where <code>v1</code> and <code>v2</code> are the IDs of the first and
- * second vertex, while <code>d1</code> and <code>d2</code> are the vertex degrees.
- * <p>
- * The result record holds the fields in the sequence <code>(v1, v2, d1, d2)</code>.
- * <p>
- * The delimiters are configurable. The default delimiter between vertex ID and
- * vertex degree is the comma (<code>,</code>). The default delimiter between the two vertices is
- * the vertical bar (<code>|</code>).
- */
-public final class EdgeWithDegreesInputFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- public static final String VERTEX_DELIMITER_CHAR = "edgeinput.vertexdelimiter";
- public static final String DEGREE_DELIMITER_CHAR = "edgeinput.degreedelimiter";
-
- private final IntValue v1 = new IntValue();
- private final IntValue v2 = new IntValue();
- private final IntValue d1 = new IntValue();
- private final IntValue d2 = new IntValue();
-
- private char vertexDelimiter;
- private char degreeDelimiter;
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- final int limit = offset + numBytes;
- int firstV = 0, secondV = 0;
- int firstD = 0, secondD = 0;
-
- final char vertexDelimiter = this.vertexDelimiter;
- final char degreeDelimiter = this.degreeDelimiter;
-
- int pos = offset;
-
- // read the first vertex ID
- while (pos < limit && bytes[pos] != degreeDelimiter) {
- firstV = firstV * 10 + (bytes[pos++] - '0');
- }
-
- pos += 1;// skip the delimiter
-
- // read the first vertex degree
- while (pos < limit && bytes[pos] != vertexDelimiter) {
- firstD = firstD * 10 + (bytes[pos++] - '0');
- }
-
- pos += 1;// skip the delimiter
-
- // read the second vertex ID
- while (pos < limit && bytes[pos] != degreeDelimiter) {
- secondV = secondV * 10 + (bytes[pos++] - '0');
- }
-
- pos += 1;// skip the delimiter
-
- // read the second vertex degree
- while (pos < limit) {
- secondD = secondD * 10 + (bytes[pos++] - '0');
- }
-
- if (firstV <= 0 || secondV <= 0 || firstV == secondV) {
- return null;
- }
-
- v1.setValue(firstV);
- v2.setValue(secondV);
- d1.setValue(firstD);
- d2.setValue(secondD);
-
- target.setField(0, v1);
- target.setField(1, v2);
- target.setField(2, d1);
- target.setField(3, d2);
-
- return target;
- }
-
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public void configure(Configuration parameters) {
- super.configure(parameters);
- this.vertexDelimiter = (char) parameters.getInteger(VERTEX_DELIMITER_CHAR, '|');
- this.degreeDelimiter = (char) parameters.getInteger(DEGREE_DELIMITER_CHAR, ',');
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesOutputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesOutputFormat.java
deleted file mode 100644
index a72a355..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/EdgeWithDegreesOutputFormat.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.triangleEnumUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-
-/**
- *
- */
-public final class EdgeWithDegreesOutputFormat extends DelimitedOutputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringBuilder line = new StringBuilder();
-
- @Override
- public int serializeRecord(Record rec, byte[] target) throws Exception {
- final int e1 = rec.getField(0, IntValue.class).getValue();
- final int e2 = rec.getField(1, IntValue.class).getValue();
- final int e3 = rec.getField(2, IntValue.class).getValue();
- final int e4 = rec.getField(3, IntValue.class).getValue();
-
- this.line.setLength(0);
- this.line.append(e1);
- this.line.append(',');
- this.line.append(e3);
- this.line.append('|');
- this.line.append(e2);
- this.line.append(',');
- this.line.append(e4);
-
- if (target.length >= line.length()) {
- for (int i = 0; i < line.length(); i++) {
- target[i] = (byte) line.charAt(i);
- }
- return line.length();
- }
- else {
- return -line.length();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/TriangleOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/TriangleOutputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/TriangleOutputFormat.java
deleted file mode 100644
index f6c27b0..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/triangleEnumUtil/TriangleOutputFormat.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.triangleEnumUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-
-/**
- *
- */
-public final class TriangleOutputFormat extends DelimitedOutputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringBuilder line = new StringBuilder();
-
- @Override
- public int serializeRecord(Record rec, byte[] target) throws Exception {
- final int e1 = rec.getField(0, IntValue.class).getValue();
- final int e2 = rec.getField(1, IntValue.class).getValue();
- final int e3 = rec.getField(2, IntValue.class).getValue();
-
- this.line.setLength(0);
- this.line.append(e1);
- this.line.append(',');
- this.line.append(e2);
- this.line.append(',');
- this.line.append(e3);
-
- if (target.length >= line.length()) {
- for (int i = 0; i < line.length(); i++) {
- target[i] = (byte) line.charAt(i);
- }
- return line.length();
- } else {
- return -line.length();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
deleted file mode 100644
index d528f5d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class KMeansBroadcast implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- int parallelism = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- String dataPointInput = (args.length > 1 ? args[1] : "");
- String clusterInput = (args.length > 2 ? args[2] : "");
- String output = (args.length > 3 ? args[3] : "");
- int numIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 2);
-
- // data source data point input
- @SuppressWarnings("unchecked")
- FileDataSource pointsSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), dataPointInput, "Data Points");
-
- // data source for cluster center input
- @SuppressWarnings("unchecked")
- FileDataSource clustersSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), clusterInput, "Centers");
-
- MapOperator dataPoints = MapOperator.builder(new PointBuilder()).name("Build data points").input(pointsSource).build();
-
- MapOperator clusterPoints = MapOperator.builder(new PointBuilder()).name("Build cluster points").input(clustersSource).build();
-
- // ---------------------- Begin K-Means Loop ---------------------
-
- BulkIteration iter = new BulkIteration("k-means loop");
- iter.setInput(clusterPoints);
- iter.setMaximumNumberOfIterations(numIterations);
-
- // compute the distances and select the closest center
- MapOperator findNearestClusterCenters = MapOperator.builder(new SelectNearestCenter())
- .setBroadcastVariable("centers", iter.getPartialSolution())
- .input(dataPoints)
- .name("Find Nearest Centers")
- .build();
-
- // computing the new cluster positions
- ReduceOperator recomputeClusterCenter = ReduceOperator.builder(new RecomputeClusterCenter(), IntValue.class, 0)
- .input(findNearestClusterCenters)
- .name("Recompute Center Positions")
- .build();
-
- iter.setNextPartialSolution(recomputeClusterCenter);
-
- // ---------------------- End K-Means Loop ---------------------
-
- // create DataSinkContract for writing the new cluster positions
- FileDataSink newClusterPoints = new FileDataSink(new PointOutFormat(), output, iter, "New Center Positions");
-
- Plan plan = new Plan(newClusterPoints, "K-Means");
- plan.setDefaultParallelism(parallelism);
- return plan;
- }
-
- @Override
- public String getDescription() {
- return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output> <numIterations>";
- }
-
- // --------------------------------------------------------------------------------------------
- // Data Types and UDFs
- // --------------------------------------------------------------------------------------------
-
- /**
- * A simple three-dimensional point.
- */
- public static final class Point implements Value {
- private static final long serialVersionUID = 1L;
-
- public double x, y, z;
-
- public Point() {}
-
- public Point(double x, double y, double z) {
- this.x = x;
- this.y = y;
- this.z = z;
- }
-
- public void add(Point other) {
- x += other.x;
- y += other.y;
- z += other.z;
- }
-
- public Point div(long val) {
- x /= val;
- y /= val;
- z /= val;
- return this;
- }
-
- public double euclideanDistance(Point other) {
- return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y) + (z-other.z)*(z-other.z));
- }
-
- public void clear() {
- x = y = z = 0.0;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeDouble(x);
- out.writeDouble(y);
- out.writeDouble(z);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- x = in.readDouble();
- y = in.readDouble();
- z = in.readDouble();
- }
-
- @Override
- public String toString() {
- return "(" + x + "|" + y + "|" + z + ")";
- }
- }
-
- public static final class PointWithId {
-
- public int id;
- public Point point;
-
- public PointWithId(int id, Point p) {
- this.id = id;
- this.point = p;
- }
- }
-
- /**
- * Determines the closest cluster center for a data point.
- */
- public static final class SelectNearestCenter extends MapFunction {
- private static final long serialVersionUID = 1L;
-
- private final IntValue one = new IntValue(1);
- private final Record result = new Record(3);
-
- private List<PointWithId> centers = new ArrayList<PointWithId>();
-
- /**
- * Reads all the center values from the broadcast variable into a collection.
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- List<Record> clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
-
- centers.clear();
- synchronized (clusterCenters) {
- for (Record r : clusterCenters) {
- centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class)));
- }
- }
- }
-
- /**
- * Computes a minimum aggregation on the distance of a data point to cluster centers.
- *
- * Output Format:
- * 0: centerID
- * 1: pointVector
- * 2: constant(1) (to enable combinable average computation in the following reducer)
- */
- @Override
- public void map(Record dataPointRecord, Collector<Record> out) {
- Point p = dataPointRecord.getField(1, Point.class);
-
- double nearestDistance = Double.MAX_VALUE;
- int centerId = -1;
-
- // check all cluster centers
- for (PointWithId center : centers) {
- // compute distance
- double distance = p.euclideanDistance(center.point);
-
- // update nearest cluster if necessary
- if (distance < nearestDistance) {
- nearestDistance = distance;
- centerId = center.id;
- }
- }
-
- // emit a new record with the center id and the data point. add a one to ease the
- // implementation of the average function with a combiner
- result.setField(0, new IntValue(centerId));
- result.setField(1, p);
- result.setField(2, one);
-
- out.collect(result);
- }
- }
-
- @Combinable
- public static final class RecomputeClusterCenter extends ReduceFunction {
- private static final long serialVersionUID = 1L;
-
- private final Point p = new Point();
-
-
- /**
- * Compute the new position (coordinate vector) of a cluster center.
- */
- @Override
- public void reduce(Iterator<Record> points, Collector<Record> out) {
- Record sum = sumPointsAndCount(points);
- sum.setField(1, sum.getField(1, Point.class).div(sum.getField(2, IntValue.class).getValue()));
- out.collect(sum);
- }
-
- /**
- * Computes a pre-aggregated average value of a coordinate vector.
- */
- @Override
- public void combine(Iterator<Record> points, Collector<Record> out) {
- out.collect(sumPointsAndCount(points));
- }
-
- private final Record sumPointsAndCount(Iterator<Record> dataPoints) {
- Record next = null;
- p.clear();
- int count = 0;
-
- // compute coordinate vector sum and count
- while (dataPoints.hasNext()) {
- next = dataPoints.next();
- p.add(next.getField(1, Point.class));
- count += next.getField(2, IntValue.class).getValue();
- }
-
- next.setField(1, p);
- next.setField(2, new IntValue(count));
- return next;
- }
- }
-
- public static final class PointBuilder extends MapFunction {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- double x = record.getField(1, DoubleValue.class).getValue();
- double y = record.getField(2, DoubleValue.class).getValue();
- double z = record.getField(3, DoubleValue.class).getValue();
-
- record.setField(1, new Point(x, y, z));
- out.collect(record);
- }
- }
-
- public static final class PointOutFormat extends FileOutputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private static final String format = "%d|%.1f|%.1f|%.1f|\n";
-
- @Override
- public void writeRecord(Record record) throws IOException {
- int id = record.getField(0, IntValue.class).getValue();
- Point p = record.getField(1, Point.class);
-
- byte[] bytes = String.format(format, id, p.x, p.y, p.z).getBytes();
-
- this.stream.write(bytes);
- }
- }
-
- public static void main(String[] args) throws Exception {
- System.out.println(LocalExecutor.optimizerPlanAsJSON(new KMeansBroadcast().getPlan("4", "/dev/random", "/dev/random", "/tmp", "20")));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
deleted file mode 100644
index 8d75d47..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansCross.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.test.recordJobs.kmeans.udfs.ComputeDistance;
-import org.apache.flink.test.recordJobs.kmeans.udfs.FindNearestCenter;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.RecomputeClusterCenter;
-import org.apache.flink.types.IntValue;
-
-@SuppressWarnings("deprecation")
-public class KMeansCross implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- final int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- final String dataPointInput = (args.length > 1 ? args[1] : "");
- final String clusterInput = (args.length > 2 ? args[2] : "");
- final String output = (args.length > 3 ? args[3] : "");
- final int numIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 1);
-
- // create DataSourceContract for cluster center input
- FileDataSource initialClusterPoints = new FileDataSource(new PointInFormat(), clusterInput, "Centers");
- initialClusterPoints.setParallelism(1);
-
- BulkIteration iteration = new BulkIteration("K-Means Loop");
- iteration.setInput(initialClusterPoints);
- iteration.setMaximumNumberOfIterations(numIterations);
-
- // create DataSourceContract for data point input
- FileDataSource dataPoints = new FileDataSource(new PointInFormat(), dataPointInput, "Data Points");
-
- // create CrossOperator for distance computation
- CrossOperator computeDistance = CrossOperator.builder(new ComputeDistance())
- .input1(dataPoints)
- .input2(iteration.getPartialSolution())
- .name("Compute Distances")
- .build();
-
- // create ReduceOperator for finding the nearest cluster centers
- ReduceOperator findNearestClusterCenters = ReduceOperator.builder(new FindNearestCenter(), IntValue.class, 0)
- .input(computeDistance)
- .name("Find Nearest Centers")
- .build();
-
- // create ReduceOperator for computing new cluster positions
- ReduceOperator recomputeClusterCenter = ReduceOperator.builder(new RecomputeClusterCenter(), IntValue.class, 0)
- .input(findNearestClusterCenters)
- .name("Recompute Center Positions")
- .build();
- iteration.setNextPartialSolution(recomputeClusterCenter);
-
- // create DataSourceContract for data point input
- FileDataSource dataPoints2 = new FileDataSource(new PointInFormat(), dataPointInput, "Data Points 2");
-
- // compute distance of points to final clusters
- CrossOperator computeFinalDistance = CrossOperator.builder(new ComputeDistance())
- .input1(dataPoints2)
- .input2(iteration)
- .name("Compute Final Distances")
- .build();
-
- // find nearest final cluster for point
- ReduceOperator findNearestFinalCluster = ReduceOperator.builder(new FindNearestCenter(), IntValue.class, 0)
- .input(computeFinalDistance)
- .name("Find Nearest Final Centers")
- .build();
-
- // create DataSinkContract for writing the new cluster positions
- FileDataSink finalClusters = new FileDataSink(new PointOutFormat(), output+"/centers", iteration, "Cluster Positions");
-
- // write assigned clusters
- FileDataSink clusterAssignments = new FileDataSink(new PointOutFormat(), output+"/points", findNearestFinalCluster, "Cluster Assignments");
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(finalClusters);
- sinks.add(clusterAssignments);
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Iterative KMeans");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
- }
-
- @Override
- public String getDescription() {
- return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output> <numIterations>";
- }
-
- public static void main(String[] args) throws Exception {
- KMeansCross kmi = new KMeansCross();
-
- if (args.length < 5) {
- System.err.println(kmi.getDescription());
- System.exit(1);
- }
-
- Plan plan = kmi.getPlan(args);
-
- // This will execute the kMeans clustering job embedded in a local context.
- LocalExecutor.execute(plan);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
deleted file mode 100644
index bdf7466..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansSingleStep.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class KMeansSingleStep implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- String dataPointInput = (args.length > 1 ? args[1] : "");
- String clusterInput = (args.length > 2 ? args[2] : "");
- String output = (args.length > 3 ? args[3] : "");
-
- // create DataSourceContract for data point input
- @SuppressWarnings("unchecked")
- FileDataSource pointsSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), dataPointInput, "Data Points");
-
- // create DataSourceContract for cluster center input
- @SuppressWarnings("unchecked")
- FileDataSource clustersSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), clusterInput, "Centers");
-
- MapOperator dataPoints = MapOperator.builder(new PointBuilder()).name("Build data points").input(pointsSource).build();
-
- MapOperator clusterPoints = MapOperator.builder(new PointBuilder()).name("Build cluster points").input(clustersSource).build();
-
- // the mapper computes the distance to all points, which it draws from a broadcast variable
- MapOperator findNearestClusterCenters = MapOperator.builder(new SelectNearestCenter())
- .setBroadcastVariable("centers", clusterPoints)
- .input(dataPoints)
- .name("Find Nearest Centers")
- .build();
-
- // create reducer recomputes the cluster centers as the average of all associated data points
- ReduceOperator recomputeClusterCenter = ReduceOperator.builder(new RecomputeClusterCenter(), IntValue.class, 0)
- .input(findNearestClusterCenters)
- .name("Recompute Center Positions")
- .build();
-
- // create DataSinkContract for writing the new cluster positions
- FileDataSink newClusterPoints = new FileDataSink(new PointOutFormat(), output, recomputeClusterCenter, "New Center Positions");
-
- // return the plan
- Plan plan = new Plan(newClusterPoints, "KMeans Iteration");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
- }
-
- @Override
- public String getDescription() {
- return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output>";
- }
-
- public static final class Point implements Value {
- private static final long serialVersionUID = 1L;
-
- public double x, y, z;
-
- public Point() {}
-
- public Point(double x, double y, double z) {
- this.x = x;
- this.y = y;
- this.z = z;
- }
-
- public void add(Point other) {
- x += other.x;
- y += other.y;
- z += other.z;
- }
-
- public Point div(long val) {
- x /= val;
- y /= val;
- z /= val;
- return this;
- }
-
- public double euclideanDistance(Point other) {
- return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y) + (z-other.z)*(z-other.z));
- }
-
- public void clear() {
- x = y = z = 0.0;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeDouble(x);
- out.writeDouble(y);
- out.writeDouble(z);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- x = in.readDouble();
- y = in.readDouble();
- z = in.readDouble();
- }
-
- @Override
- public String toString() {
- return "(" + x + "|" + y + "|" + z + ")";
- }
- }
-
- public static final class PointWithId {
-
- public int id;
- public Point point;
-
- public PointWithId(int id, Point p) {
- this.id = id;
- this.point = p;
- }
- }
-
- /**
- * Determines the closest cluster center for a data point.
- */
- public static final class SelectNearestCenter extends MapFunction {
- private static final long serialVersionUID = 1L;
-
- private final IntValue one = new IntValue(1);
- private final Record result = new Record(3);
-
- private List<PointWithId> centers = new ArrayList<PointWithId>();
-
- /**
- * Reads all the center values from the broadcast variable into a collection.
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- Collection<Record> clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
-
- centers.clear();
- for (Record r : clusterCenters) {
- centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class)));
- }
- }
-
- /**
- * Computes a minimum aggregation on the distance of a data point to cluster centers.
- *
- * Output Format:
- * 0: centerID
- * 1: pointVector
- * 2: constant(1) (to enable combinable average computation in the following reducer)
- */
- @Override
- public void map(Record dataPointRecord, Collector<Record> out) {
- Point p = dataPointRecord.getField(1, Point.class);
-
- double nearestDistance = Double.MAX_VALUE;
- int centerId = -1;
-
- // check all cluster centers
- for (PointWithId center : centers) {
- // compute distance
- double distance = p.euclideanDistance(center.point);
-
- // update nearest cluster if necessary
- if (distance < nearestDistance) {
- nearestDistance = distance;
- centerId = center.id;
- }
- }
-
- // emit a new record with the center id and the data point. add a one to ease the
- // implementation of the average function with a combiner
- result.setField(0, new IntValue(centerId));
- result.setField(1, p);
- result.setField(2, one);
-
- out.collect(result);
- }
- }
-
- @Combinable
- public static final class RecomputeClusterCenter extends ReduceFunction {
- private static final long serialVersionUID = 1L;
-
- private final Point p = new Point();
-
-
- /**
- * Compute the new position (coordinate vector) of a cluster center.
- */
- @Override
- public void reduce(Iterator<Record> points, Collector<Record> out) {
- Record sum = sumPointsAndCount(points);
- sum.setField(1, sum.getField(1, Point.class).div(sum.getField(2, IntValue.class).getValue()));
- out.collect(sum);
- }
-
- /**
- * Computes a pre-aggregated average value of a coordinate vector.
- */
- @Override
- public void combine(Iterator<Record> points, Collector<Record> out) {
- out.collect(sumPointsAndCount(points));
- }
-
- private final Record sumPointsAndCount(Iterator<Record> dataPoints) {
- Record next = null;
- p.clear();
- int count = 0;
-
- // compute coordinate vector sum and count
- while (dataPoints.hasNext()) {
- next = dataPoints.next();
- p.add(next.getField(1, Point.class));
- count += next.getField(2, IntValue.class).getValue();
- }
-
- next.setField(1, p);
- next.setField(2, new IntValue(count));
- return next;
- }
- }
-
- public static final class PointBuilder extends MapFunction {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- double x = record.getField(1, DoubleValue.class).getValue();
- double y = record.getField(2, DoubleValue.class).getValue();
- double z = record.getField(3, DoubleValue.class).getValue();
-
- record.setField(1, new Point(x, y, z));
- out.collect(record);
- }
- }
-
- public static final class PointOutFormat extends FileOutputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private static final String format = "%d|%.1f|%.1f|%.1f|\n";
-
- @Override
- public void writeRecord(Record record) throws IOException {
- int id = record.getField(0, IntValue.class).getValue();
- Point p = record.getField(1, Point.class);
-
- byte[] bytes = String.format(format, id, p.x, p.y, p.z).getBytes();
-
- this.stream.write(bytes);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
deleted file mode 100644
index ee33113..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-/**
- * Cross PACT computes the distance of all data points to all cluster
- * centers.
- */
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirst({0,1})
-public class ComputeDistance extends CrossFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final DoubleValue distance = new DoubleValue();
-
- /**
- * Computes the distance of one data point to one cluster center.
- *
- * Output Format:
- * 0: pointID
- * 1: pointVector
- * 2: clusterID
- * 3: distance
- */
- @Override
- public Record cross(Record dataPointRecord, Record clusterCenterRecord) throws Exception {
-
- CoordVector dataPoint = dataPointRecord.getField(1, CoordVector.class);
-
- IntValue clusterCenterId = clusterCenterRecord.getField(0, IntValue.class);
- CoordVector clusterPoint = clusterCenterRecord.getField(1, CoordVector.class);
-
- this.distance.setValue(dataPoint.computeEuclidianDistance(clusterPoint));
-
- // add cluster center id and distance to the data point record
- dataPointRecord.setField(2, clusterCenterId);
- dataPointRecord.setField(3, this.distance);
-
- return dataPointRecord;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
deleted file mode 100644
index 78b60ef..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistanceParameterized.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * Cross PACT computes the distance of all data points to all cluster
- * centers.
- */
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirst({0,1})
-public class ComputeDistanceParameterized extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final DoubleValue distance = new DoubleValue();
-
- private Collection<Record> clusterCenters;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- this.clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
- }
-
- /**
- * Computes the distance of one data point to one cluster center.
- *
- * Output Format:
- * 0: pointID
- * 1: pointVector
- * 2: clusterID
- * 3: distance
- */
- @Override
- public void map(Record dataPointRecord, Collector<Record> out) {
-
- CoordVector dataPoint = dataPointRecord.getField(1, CoordVector.class);
-
- for (Record clusterCenterRecord : this.clusterCenters) {
- IntValue clusterCenterId = clusterCenterRecord.getField(0, IntValue.class);
- CoordVector clusterPoint = clusterCenterRecord.getField(1, CoordVector.class);
-
- this.distance.setValue(dataPoint.computeEuclidianDistance(clusterPoint));
-
- // add cluster center id and distance to the data point record
- dataPointRecord.setField(2, clusterCenterId);
- dataPointRecord.setField(3, this.distance);
-
- out.collect(dataPointRecord);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/CoordVector.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/CoordVector.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/CoordVector.java
deleted file mode 100644
index 67e87a3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/CoordVector.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-
-/**
- * Implements a feature vector as a multi-dimensional point. Coordinates of that point
- * (= the features) are stored as double values. The distance between two feature vectors is
- * the Euclidian distance between the points.
- */
-public final class CoordVector implements Key<CoordVector> {
- private static final long serialVersionUID = 1L;
-
- // coordinate array
- private double[] coordinates;
-
- /**
- * Initializes a blank coordinate vector. Required for deserialization!
- */
- public CoordVector() {
- coordinates = null;
- }
-
- /**
- * Initializes a coordinate vector.
- *
- * @param coordinates The coordinate vector of a multi-dimensional point.
- */
- public CoordVector(Double[] coordinates) {
- this.coordinates = new double[coordinates.length];
- for (int i = 0; i < coordinates.length; i++) {
- this.coordinates[i] = coordinates[i];
- }
- }
-
- /**
- * Initializes a coordinate vector.
- *
- * @param coordinates The coordinate vector of a multi-dimensional point.
- */
- public CoordVector(double[] coordinates) {
- this.coordinates = coordinates;
- }
-
- /**
- * Returns the coordinate vector of a multi-dimensional point.
- *
- * @return The coordinate vector of a multi-dimensional point.
- */
- public double[] getCoordinates() {
- return this.coordinates;
- }
-
- /**
- * Sets the coordinate vector of a multi-dimensional point.
- *
- * @param coordinates The dimension values of the point.
- */
- public void setCoordinates(double[] coordinates) {
- this.coordinates = coordinates;
- }
-
- /**
- * Computes the Euclidian distance between this coordinate vector and a
- * second coordinate vector.
- *
- * @param cv The coordinate vector to which the distance is computed.
- * @return The Euclidian distance to coordinate vector cv. If cv has a
- * different length than this coordinate vector, -1 is returned.
- */
- public double computeEuclidianDistance(CoordVector cv) {
- // check coordinate vector lengths
- if (cv.coordinates.length != this.coordinates.length) {
- return -1.0;
- }
-
- double quadSum = 0.0;
- for (int i = 0; i < this.coordinates.length; i++) {
- double diff = this.coordinates[i] - cv.coordinates[i];
- quadSum += diff*diff;
- }
- return Math.sqrt(quadSum);
- }
-
-
- @Override
- public void read(DataInputView in) throws IOException {
- int length = in.readInt();
- this.coordinates = new double[length];
- for (int i = 0; i < length; i++) {
- this.coordinates[i] = in.readDouble();
- }
- }
-
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeInt(this.coordinates.length);
- for (int i = 0; i < this.coordinates.length; i++) {
- out.writeDouble(this.coordinates[i]);
- }
- }
-
- /**
- * Compares this coordinate vector to another key.
- *
- * @return -1 if the other key is not of type CoordVector. If the other
- * key is also a CoordVector but its length differs from this
- * coordinates vector, -1 is return if this coordinate vector is
- * smaller and 1 if it is larger. If both coordinate vectors
- * have the same length, the coordinates of both are compared.
- * If a coordinate of this coordinate vector is smaller than the
- * corresponding coordinate of the other vector -1 is returned
- * and 1 otherwise. If all coordinates are identical 0 is
- * returned.
- */
- @Override
- public int compareTo(CoordVector o) {
- // check if both coordinate vectors have identical lengths
- if (o.coordinates.length > this.coordinates.length) {
- return -1;
- }
- else if (o.coordinates.length < this.coordinates.length) {
- return 1;
- }
-
- // compare all coordinates
- for (int i = 0; i < this.coordinates.length; i++) {
- if (o.coordinates[i] > this.coordinates[i]) {
- return -1;
- } else if (o.coordinates[i] < this.coordinates[i]) {
- return 1;
- }
- }
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
deleted file mode 100644
index 1e893ce..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/FindNearestCenter.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * Reduce PACT determines the closes cluster center for a data point. This
- * is a minimum aggregation. Hence, a Combiner can be easily implemented.
- */
-@SuppressWarnings("deprecation")
-@Combinable
-@ConstantFields(1)
-public class FindNearestCenter extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final IntValue centerId = new IntValue();
- private final CoordVector position = new CoordVector();
- private final IntValue one = new IntValue(1);
-
- private final Record result = new Record(3);
-
- /**
- * Computes a minimum aggregation on the distance of a data point to
- * cluster centers.
- *
- * Output Format:
- * 0: centerID
- * 1: pointVector
- * 2: constant(1) (to enable combinable average computation in the following reducer)
- */
- @Override
- public void reduce(Iterator<Record> pointsWithDistance, Collector<Record> out) {
- double nearestDistance = Double.MAX_VALUE;
- int nearestClusterId = 0;
-
- // check all cluster centers
- while (pointsWithDistance.hasNext()) {
- Record res = pointsWithDistance.next();
-
- double distance = res.getField(3, DoubleValue.class).getValue();
-
- // compare distances
- if (distance < nearestDistance) {
- // if distance is smaller than smallest till now, update nearest cluster
- nearestDistance = distance;
- nearestClusterId = res.getField(2, IntValue.class).getValue();
- res.getFieldInto(1, this.position);
- }
- }
-
- // emit a new record with the center id and the data point. add a one to ease the
- // implementation of the average function with a combiner
- this.centerId.setValue(nearestClusterId);
- this.result.setField(0, this.centerId);
- this.result.setField(1, this.position);
- this.result.setField(2, this.one);
-
- out.collect(this.result);
- }
-
- // ----------------------------------------------------------------------------------------
-
- private final Record nearest = new Record();
-
- /**
- * Computes a minimum aggregation on the distance of a data point to
- * cluster centers.
- */
- @Override
- public void combine(Iterator<Record> pointsWithDistance, Collector<Record> out) {
- double nearestDistance = Double.MAX_VALUE;
-
- // check all cluster centers
- while (pointsWithDistance.hasNext()) {
- Record res = pointsWithDistance.next();
- double distance = res.getField(3, DoubleValue.class).getValue();
-
- // compare distances
- if (distance < nearestDistance) {
- nearestDistance = distance;
- res.copyTo(this.nearest);
- }
- }
-
- // emit nearest one
- out.collect(this.nearest);
- }
-}