You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:06 UTC
[24/60] Renamed java examples package
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
new file mode 100644
index 0000000..c3cbc6d
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.graph;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ *
+ * <p>
+ * The algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
+ * that closes the triangle.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ * (1)
+ * / \
+ * (2)-(12)
+ * </pre>
+ *
+ * Usage: <code>EnumTriangleBasic <edge path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ *
+ */
+@SuppressWarnings("serial")
+public class EnumTrianglesBasic {
+
+ static boolean fileOutput = false;
+ static String edgePath = null;
+ static String outputPath = null;
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // read input data
+ DataSet<Edge> edges = getEdgeDataSet(env);
+
+ // project edges by vertex id
+ DataSet<Edge> edgesById = edges
+ .map(new EdgeByIdProjector());
+
+ DataSet<Triad> triangles = edgesById
+ // build triads
+ .groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
+ // filter triads
+ .join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
+
+ // emit result
+ if(fileOutput) {
+ triangles.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ triangles.print();
+ }
+
+ // execute program
+ env.execute("Basic Triangle Enumeration Example");
+
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /** Converts a Tuple2 into an Edge */
+ public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
+ private final Edge outEdge = new Edge();
+
+ @Override
+ public Edge map(Tuple2<Integer, Integer> t) throws Exception {
+ outEdge.copyVerticesFromTuple2(t);
+ return outEdge;
+ }
+ }
+
+ /** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
+ private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
+
+ @Override
+ public Edge map(Edge inEdge) throws Exception {
+
+ // flip vertices if necessary
+ if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
+ inEdge.flipVertices();
+ }
+
+ return inEdge;
+ }
+ }
+
+ /**
+ * Builds triads (triples of vertices) from pairs of edges that share a vertex.
+ * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
+ * Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+ */
+ private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
+ private final List<Integer> vertices = new ArrayList<Integer>();
+ private final Triad outTriad = new Triad();
+
+ @Override
+ public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+
+ final Iterator<Edge> edges = edgesIter.iterator();
+
+ // clear vertex list
+ vertices.clear();
+
+ // read first edge
+ Edge firstEdge = edges.next();
+ outTriad.setFirstVertex(firstEdge.getFirstVertex());
+ vertices.add(firstEdge.getSecondVertex());
+
+ // build and emit triads
+ while (edges.hasNext()) {
+ Integer higherVertexId = edges.next().getSecondVertex();
+
+ // combine vertex with all previously read vertices
+ for (Integer lowerVertexId : vertices) {
+ outTriad.setSecondVertex(lowerVertexId);
+ outTriad.setThirdVertex(higherVertexId);
+ out.collect(outTriad);
+ }
+ vertices.add(higherVertexId);
+ }
+ }
+ }
+
+ /** Filters triads (three vertices connected by two edges) without a closing third edge. */
+ private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
+
+ @Override
+ public Triad join(Triad triad, Edge edge) throws Exception {
+ return triad;
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(args.length == 2) {
+ edgePath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing Enum Triangles Basic example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(edgePath)
+ .fieldDelimiter(' ')
+ .includeFields(true, true)
+ .types(Integer.class, Integer.class)
+ .map(new TupleEdgeConverter());
+ } else {
+ return EnumTrianglesData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
new file mode 100644
index 0000000..9dcb168
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.graph;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.EdgeWithDegrees;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ *
+ * <p>
+ * The basic algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
+ * that closes the triangle.
+ *
+ * <p>
+ * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+ * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
+ * reduce the number of triads.
+ * This implementation extends the basic algorithm by computing output degrees of edge vertices and
+ * grouping on edges on the vertex with the smaller degree.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ * (1)
+ * / \
+ * (2)-(12)
+ * </pre>
+ *
+ * Usage: <code>EnumTriangleOpt <edge path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ *
+ */
+@SuppressWarnings("serial")
+public class EnumTrianglesOpt {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // read input data
+ DataSet<Edge> edges = getEdgeDataSet(env);
+
+ // annotate edges with degrees
+ DataSet<EdgeWithDegrees> edgesWithDegrees = edges
+ .flatMap(new EdgeDuplicator())
+ .groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter())
+ .groupBy(EdgeWithDegrees.V1,EdgeWithDegrees.V2).reduce(new DegreeJoiner());
+
+ // project edges by degrees
+ DataSet<Edge> edgesByDegree = edgesWithDegrees
+ .map(new EdgeByDegreeProjector());
+ // project edges by vertex id
+ DataSet<Edge> edgesById = edgesByDegree
+ .map(new EdgeByIdProjector());
+
+ DataSet<Triad> triangles = edgesByDegree
+ // build triads
+ .groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
+ // filter triads
+ .join(edgesById).where(Triad.V2,Triad.V3).equalTo(Edge.V1,Edge.V2).with(new TriadFilter());
+
+ // emit result
+ if(fileOutput) {
+ triangles.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ triangles.print();
+ }
+
+ // execute program
+ env.execute("Triangle Enumeration Example");
+
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /** Converts a Tuple2 into an Edge */
+ public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
+ private final Edge outEdge = new Edge();
+
+ @Override
+ public Edge map(Tuple2<Integer, Integer> t) throws Exception {
+ outEdge.copyVerticesFromTuple2(t);
+ return outEdge;
+ }
+ }
+
+ /** Emits for an edge the original edge and its switched version. */
+ private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> {
+
+ @Override
+ public void flatMap(Edge edge, Collector<Edge> out) throws Exception {
+ out.collect(edge);
+ edge.flipVertices();
+ out.collect(edge);
+ }
+ }
+
+ /**
+ * Counts the number of edges that share a common vertex.
+ * Emits one edge for each input edge with a degree annotation for the shared vertex.
+ * For each emitted edge, the first vertex is the vertex with the smaller id.
+ */
+ private static class DegreeCounter implements GroupReduceFunction<Edge, EdgeWithDegrees> {
+
+ final ArrayList<Integer> otherVertices = new ArrayList<Integer>();
+ final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
+
+ @Override
+ public void reduce(Iterable<Edge> edgesIter, Collector<EdgeWithDegrees> out) {
+
+ Iterator<Edge> edges = edgesIter.iterator();
+ otherVertices.clear();
+
+ // get first edge
+ Edge edge = edges.next();
+ Integer groupVertex = edge.getFirstVertex();
+ this.otherVertices.add(edge.getSecondVertex());
+
+ // get all other edges (assumes edges are sorted by second vertex)
+ while (edges.hasNext()) {
+ edge = edges.next();
+ Integer otherVertex = edge.getSecondVertex();
+ // collect unique vertices
+ if(!otherVertices.contains(otherVertex) && otherVertex != groupVertex) {
+ this.otherVertices.add(otherVertex);
+ }
+ }
+ int degree = this.otherVertices.size();
+
+ // emit edges
+ for(Integer otherVertex : this.otherVertices) {
+ if(groupVertex < otherVertex) {
+ outputEdge.setFirstVertex(groupVertex);
+ outputEdge.setFirstDegree(degree);
+ outputEdge.setSecondVertex(otherVertex);
+ outputEdge.setSecondDegree(0);
+ } else {
+ outputEdge.setFirstVertex(otherVertex);
+ outputEdge.setFirstDegree(0);
+ outputEdge.setSecondVertex(groupVertex);
+ outputEdge.setSecondDegree(degree);
+ }
+ out.collect(outputEdge);
+ }
+ }
+ }
+
+ /**
+ * Builds an edge with degree annotation from two edges that have the same vertices and only one
+ * degree annotation.
+ */
+ private static class DegreeJoiner implements ReduceFunction<EdgeWithDegrees> {
+ private final EdgeWithDegrees outEdge = new EdgeWithDegrees();
+
+ @Override
+ public EdgeWithDegrees reduce(EdgeWithDegrees edge1, EdgeWithDegrees edge2) throws Exception {
+
+ // copy first edge
+ outEdge.copyFrom(edge1);
+
+ // set missing degree
+ if(edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) {
+ outEdge.setFirstDegree(edge2.getFirstDegree());
+ } else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) {
+ outEdge.setSecondDegree(edge2.getSecondDegree());
+ }
+ return outEdge;
+ }
+ }
+
+ /** Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree. */
+ private static class EdgeByDegreeProjector implements MapFunction<EdgeWithDegrees, Edge> {
+
+ private final Edge outEdge = new Edge();
+
+ @Override
+ public Edge map(EdgeWithDegrees inEdge) throws Exception {
+
+ // copy vertices to simple edge
+ outEdge.copyVerticesFromEdgeWithDegrees(inEdge);
+
+ // flip vertices if first degree is larger than second degree.
+ if(inEdge.getFirstDegree() > inEdge.getSecondDegree()) {
+ outEdge.flipVertices();
+ }
+
+ // return edge
+ return outEdge;
+ }
+ }
+
+ /** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
+ private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
+
+ @Override
+ public Edge map(Edge inEdge) throws Exception {
+
+ // flip vertices if necessary
+ if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
+ inEdge.flipVertices();
+ }
+
+ return inEdge;
+ }
+ }
+
+ /**
+ * Builds triads (triples of vertices) from pairs of edges that share a vertex.
+ * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
+ * Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+ */
+ private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
+
+ private final List<Integer> vertices = new ArrayList<Integer>();
+ private final Triad outTriad = new Triad();
+
+ @Override
+ public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+ final Iterator<Edge> edges = edgesIter.iterator();
+
+ // clear vertex list
+ vertices.clear();
+
+ // read first edge
+ Edge firstEdge = edges.next();
+ outTriad.setFirstVertex(firstEdge.getFirstVertex());
+ vertices.add(firstEdge.getSecondVertex());
+
+ // build and emit triads
+ while (edges.hasNext()) {
+ Integer higherVertexId = edges.next().getSecondVertex();
+
+ // combine vertex with all previously read vertices
+ for(Integer lowerVertexId : vertices) {
+ outTriad.setSecondVertex(lowerVertexId);
+ outTriad.setThirdVertex(higherVertexId);
+ out.collect(outTriad);
+ }
+ vertices.add(higherVertexId);
+ }
+ }
+ }
+
+ /** Filters triads (three vertices connected by two edges) without a closing third edge. */
+ private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
+
+ @Override
+ public Triad join(Triad triad, Edge edge) throws Exception {
+ return triad;
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgePath = null;
+ private static String outputPath = null;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(args.length == 2) {
+ edgePath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing Enum Triangles Opt example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: EnumTriangleOpt <edge path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(edgePath)
+ .fieldDelimiter(' ')
+ .includeFields(true, true)
+ .types(Integer.class, Integer.class)
+ .map(new TupleEdgeConverter());
+ } else {
+ return EnumTrianglesData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
new file mode 100644
index 0000000..4a491b4
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.graph;
+
+import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.examples.java.graph.util.PageRankData;
+
+/**
+ * A basic implementation of the Page Rank algorithm using a bulk iteration.
+ *
+ * <p>
+ * This implementation requires a set of pages and a set of directed links as input and works as follows. <br>
+ * In each iteration, the rank of every page is evenly distributed to all pages it points to.
+ * Each page collects the partial ranks of all pages that point to it, sums them up, and applies a dampening factor to the sum.
+ * The result is the new rank of the page. A new iteration is started with the new ranks of all pages.
+ * This implementation terminates after a fixed number of iterations.<br>
+ * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Pages represented as an (long) ID separated by new-line characters.<br>
+ * For example <code>"1\n2\n12\n42\n63\n"</code> gives five pages with IDs 1, 2, 12, 42, and 63.
+ * <li>Links are represented as pairs of page IDs which are separated by space
+ * characters. Links are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).<br>
+ * For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
+ * </ul>
+ *
+ * <p>
+ * Usage: <code>PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations></code><br>
+ * If no parameters are provided, the program is run with default data from {@link PageRankData} and 10 iterations.
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Bulk Iterations
+ * <li>Default Join
+ * <li>Configure user-defined functions using constructor parameters.
+ * </ul>
+ *
+ *
+ */
+@SuppressWarnings("serial")
+public class PageRankBasic {
+
+ private static final double DAMPENING_FACTOR = 0.85;
+ private static final double EPSILON = 0.0001;
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataSet<Long> pagesInput = getPagesDataSet(env);
+ DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
+
+ // assign initial rank to pages
+ DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
+ map(new RankAssigner((1.0d / numPages)));
+
+ // build adjacency list from link input
+ DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
+ linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
+
+ // set iterative data set
+ IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
+
+ DataSet<Tuple2<Long, Double>> newRanks = iteration
+ // join pages with outgoing edges and distribute rank
+ .join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
+ // collect and sum ranks
+ .groupBy(0).aggregate(SUM, 1)
+ // apply dampening factor
+ .map(new Dampener(DAMPENING_FACTOR, numPages));
+
+ DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
+ newRanks,
+ newRanks.join(iteration).where(0).equalTo(0)
+ // termination condition
+ .filter(new EpsilonFilter()));
+
+ // emit result
+ if(fileOutput) {
+ finalPageRanks.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ finalPageRanks.print();
+ }
+
+ // execute program
+ env.execute("Basic Page Rank Example");
+
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * A map function that assigns an initial rank to all pages.
+ */
+ public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
+ Tuple2<Long, Double> outPageWithRank;
+
+ public RankAssigner(double rank) {
+ this.outPageWithRank = new Tuple2<Long, Double>(-1l, rank);
+ }
+
+ @Override
+ public Tuple2<Long, Double> map(Long page) {
+ outPageWithRank.f0 = page;
+ return outPageWithRank;
+ }
+ }
+
+ /**
+ * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
+ * originate. Run as a pre-processing step.
+ */
+ @ConstantFields("0")
+ public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
+
+ private final ArrayList<Long> neighbors = new ArrayList<Long>();
+
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
+ neighbors.clear();
+ Long id = 0L;
+
+ for (Tuple2<Long, Long> n : values) {
+ id = n.f0;
+ neighbors.add(n.f1);
+ }
+ out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
+ }
+ }
+
+ /**
+ * Join function that distributes a fraction of a vertex's rank to all neighbors.
+ */
+ public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {
+
+ @Override
+ public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){
+ Long[] neigbors = value.f1.f1;
+ double rank = value.f0.f1;
+ double rankToDistribute = rank / ((double) neigbors.length);
+
+ for (int i = 0; i < neigbors.length; i++) {
+ out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute));
+ }
+ }
+ }
+
+ /**
+ * The function that applies the page rank dampening formula
+ */
+ @ConstantFields("0")
+ public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+
+ private final double dampening;
+ private final double randomJump;
+
+ public Dampener(double dampening, double numVertices) {
+ this.dampening = dampening;
+ this.randomJump = (1 - dampening) / numVertices;
+ }
+
+ @Override
+ public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
+ value.f1 = (value.f1 * dampening) + randomJump;
+ return value;
+ }
+ }
+
+ /**
+ * Filter that filters vertices where the rank difference is below a threshold.
+ */
+ public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
+
+ @Override
+ public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
+ return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String pagesInputPath = null;
+ private static String linksInputPath = null;
+ private static String outputPath = null;
+ private static long numPages = 0;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ if(args.length == 5) {
+ fileOutput = true;
+ pagesInputPath = args[0];
+ linksInputPath = args[1];
+ outputPath = args[2];
+ numPages = Integer.parseInt(args[3]);
+ maxIterations = Integer.parseInt(args[4]);
+ } else {
+ System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+
+ numPages = PageRankData.getNumberOfPages();
+ }
+ return true;
+ }
+
+ private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env
+ .readCsvFile(pagesInputPath)
+ .fieldDelimiter(' ')
+ .lineDelimiter("\n")
+ .types(Long.class)
+ .map(new MapFunction<Tuple1<Long>, Long>() {
+ @Override
+ public Long map(Tuple1<Long> v) { return v.f0; }
+ });
+ } else {
+ return PageRankData.getDefaultPagesDataSet(env);
+ }
+ }
+
+ private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(linksInputPath)
+ .fieldDelimiter(' ')
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class);
+ } else {
+ return PageRankData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
new file mode 100644
index 0000000..30230d6
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@SuppressWarnings("serial")
+public class TransitiveClosureNaive implements ProgramDescription {
+
+
+ public static void main (String... args) throws Exception{
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
+
+ IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);
+
+ DataSet<Tuple2<Long,Long>> nextPaths = paths
+ .join(edges)
+ .where(1)
+ .equalTo(0)
+ .with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ /**
+ left: Path (z,x) - x is reachable by z
+ right: Edge (x,y) - edge x-->y exists
+ out: Path (z,y) - y is reachable by z
+ */
+ public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
+ return new Tuple2<Long, Long>(
+ new Long(left.f0),
+ new Long(right.f1));
+ }
+ })
+ .union(paths)
+ .groupBy(0, 1)
+ .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(values.iterator().next());
+ }
+ });
+
+ DataSet<Tuple2<Long,Long>> newPaths = paths
+ .coGroup(nextPaths)
+ .where(0).equalTo(0)
+ .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ Set prevSet = new HashSet<Tuple2<Long,Long>>();
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
+ for (Tuple2<Long,Long> prev : prevPaths) {
+ prevSet.add(prev);
+ }
+ for (Tuple2<Long,Long> next: nextPaths) {
+ if (!prevSet.contains(next)) {
+ out.collect(next);
+ }
+ }
+ }
+ });
+
+ DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
+
+
+ // emit result
+ if (fileOutput) {
+ transitiveClosure.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ transitiveClosure.print();
+ }
+
+ // execute program
+ env.execute("Transitive Closure Example");
+
+ }
+
+ @Override
+ public String getDescription() {
+ return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgesPath = null;
+ private static String outputPath = null;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if (programArguments.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (programArguments.length == 3) {
+ edgesPath = programArguments[0];
+ outputPath = programArguments[1];
+ maxIterations = Integer.parseInt(programArguments[2]);
+ } else {
+ System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+ }
+ return true;
+ }
+
+
+ private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+ } else {
+ return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
new file mode 100644
index 0000000..27c7d45
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph.util;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Provides the default data sets used for the Connected Components example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class ConnectedComponentsData {
+
+ public static final Object[][] VERTICES = new Object[][] {
+ new Object[]{1L}, new Object[]{2L}, new Object[]{3L}, new Object[]{4L},
+ new Object[]{5L},new Object[]{6L}, new Object[]{7L}, new Object[]{8L},
+ new Object[]{9L}, new Object[]{10L}, new Object[]{11L}, new Object[]{12L},
+ new Object[]{13L}, new Object[]{14L}, new Object[]{15L}, new Object[]{16L}
+ };
+
+ public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) {
+ List<Long> verticesList = new LinkedList<Long>();
+ for (Object[] vertex : VERTICES) {
+ verticesList.add((Long) vertex[0]);
+ }
+ return env.fromCollection(verticesList);
+ }
+
+ public static final Object[][] EDGES = new Object[][] {
+ new Object[]{1L, 2L},
+ new Object[]{2L, 3L},
+ new Object[]{2L, 4L},
+ new Object[]{3L, 5L},
+ new Object[]{6L, 7L},
+ new Object[]{8L, 9L},
+ new Object[]{8L, 10L},
+ new Object[]{5L, 11L},
+ new Object[]{11L, 12L},
+ new Object[]{10L, 13L},
+ new Object[]{9L, 14L},
+ new Object[]{13L, 14L},
+ new Object[]{1L, 15L},
+ new Object[]{16L, 1L}
+ };
+
+ public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, Long>>();
+ for (Object[] edge : EDGES) {
+ edgeList.add(new Tuple2<Long, Long>((Long) edge[0], (Long) edge[1]));
+ }
+ return env.fromCollection(edgeList);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
new file mode 100644
index 0000000..257b7d8
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
+
+/**
+ * Provides the default data sets used for the Triangle Enumeration example programs.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class EnumTrianglesData {
+
+ public static final Object[][] EDGES = {
+ {1, 2},
+ {1, 3},
+ {1 ,4},
+ {1, 5},
+ {2, 3},
+ {2, 5},
+ {3, 4},
+ {3, 7},
+ {3, 8},
+ {5, 6},
+ {7, 8}
+ };
+
+ public static DataSet<Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge> edges = new ArrayList<Edge>();
+ for(Object[] e : EDGES) {
+ edges.add(new Edge((Integer)e[0], (Integer)e[1]));
+ }
+
+ return env.fromCollection(edges);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
new file mode 100644
index 0000000..d7eefae
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+public class EnumTrianglesDataTypes {
+
+ public static class Edge extends Tuple2<Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ public static final int V1 = 0;
+ public static final int V2 = 1;
+
+ public Edge() {}
+
+ public Edge(final Integer v1, final Integer v2) {
+ this.setFirstVertex(v1);
+ this.setSecondVertex(v2);
+ }
+
+ public Integer getFirstVertex() { return this.getField(V1); }
+
+ public Integer getSecondVertex() { return this.getField(V2); }
+
+ public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+
+ public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+
+ public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) {
+ this.setFirstVertex(t.f0);
+ this.setSecondVertex(t.f1);
+ }
+
+ public void copyVerticesFromEdgeWithDegrees(EdgeWithDegrees ewd) {
+ this.setFirstVertex(ewd.getFirstVertex());
+ this.setSecondVertex(ewd.getSecondVertex());
+ }
+
+ public void flipVertices() {
+ Integer tmp = this.getFirstVertex();
+ this.setFirstVertex(this.getSecondVertex());
+ this.setSecondVertex(tmp);
+ }
+ }
+
+ public static class Triad extends Tuple3<Integer, Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ public static final int V1 = 0;
+ public static final int V2 = 1;
+ public static final int V3 = 2;
+
+ public Triad() {}
+
+ public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+
+ public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+
+ public void setThirdVertex(final Integer vertex3) { this.setField(vertex3, V3); }
+ }
+
+ public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ public static final int V1 = 0;
+ public static final int V2 = 1;
+ public static final int D1 = 2;
+ public static final int D2 = 3;
+
+ public EdgeWithDegrees() { }
+
+ public Integer getFirstVertex() { return this.getField(V1); }
+
+ public Integer getSecondVertex() { return this.getField(V2); }
+
+ public Integer getFirstDegree() { return this.getField(D1); }
+
+ public Integer getSecondDegree() { return this.getField(D2); }
+
+ public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+
+ public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+
+ public void setFirstDegree(final Integer degree1) { this.setField(degree1, D1); }
+
+ public void setSecondDegree(final Integer degree2) { this.setField(degree2, D2); }
+
+ public void copyFrom(final EdgeWithDegrees edge) {
+ this.setFirstVertex(edge.getFirstVertex());
+ this.setSecondVertex(edge.getSecondVertex());
+ this.setFirstDegree(edge.getFirstDegree());
+ this.setSecondDegree(edge.getSecondDegree());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
new file mode 100644
index 0000000..d6ee274
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * Provides the default data sets used for the PageRank example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class PageRankData {
+
+ public static final Object[][] EDGES = {
+ {1L, 2L},
+ {1L, 15L},
+ {2L, 3L},
+ {2L, 4L},
+ {2L, 5L},
+ {2L, 6L},
+ {2L, 7L},
+ {3L, 13L},
+ {4L, 2L},
+ {5L, 11L},
+ {5L, 12L},
+ {6L, 1L},
+ {6L, 7L},
+ {6L, 8L},
+ {7L, 1L},
+ {7L, 8L},
+ {8L, 1L},
+ {8L, 9L},
+ {8L, 10L},
+ {9L, 14L},
+ {9L, 1L},
+ {10L, 1L},
+ {10L, 13L},
+ {11L, 12L},
+ {11L, 1L},
+ {12L, 1L},
+ {13L, 14L},
+ {14L, 12L},
+ {15L, 1L},
+ };
+
+ private static long numPages = 15;
+
+ public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
+ for(Object[] e : EDGES) {
+ edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
+ }
+ return env.fromCollection(edges);
+ }
+
+ public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
+ return env.generateSequence(1, 15);
+ }
+
+ public static long getNumberOfPages() {
+ return numPages;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
new file mode 100644
index 0000000..0d4fd3e
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.misc;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Estimates the value of Pi using the Monte Carlo method.
+ * The area of a circle is Pi * R^2, R being the radius of the circle
+ * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
+ *
+ * Thus Pi = 4 * (area of circle / area of square).
+ *
+ * The idea is to find a way to estimate the circle to square area ratio.
+ * The Monte Carlo method suggests collecting random points (within the square)
+ * and then counting the number of points that fall within the circle
+ *
+ * <pre>
+ * {@code
+ * x = Math.random()
+ * y = Math.random()
+ *
+ * x * x + y * y < 1
+ * }
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class PiEstimation implements java.io.Serializable {
+
+
+ public static void main(String[] args) throws Exception {
+
+ final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000;
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // count how many of the samples would randomly fall into
+ // the unit circle
+ DataSet<Long> count =
+ env.generateSequence(1, numSamples)
+ .map(new Sampler())
+ .reduce(new SumReducer());
+
+ // the ratio of the unit circle surface to 4 times the unit square is pi
+ DataSet<Double> pi = count
+ .map(new MapFunction<Long, Double>() {
+ public Double map(Long value) {
+ return value * 4.0 / numSamples;
+ }
+ });
+
+ System.out.println("We estimate Pi to be:");
+ pi.print();
+
+ env.execute();
+ }
+
+ //*************************************************************************
+ // USER FUNCTIONS
+ //*************************************************************************
+
+
+ /**
+ * Sampler randomly emits points that fall within a square of edge x * y.
+ * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
+ * If the distance is less than 1, then and only then does it returns a 1.
+ */
+ public static class Sampler implements MapFunction<Long, Long> {
+
+ @Override
+ public Long map(Long value) throws Exception{
+ double x = Math.random();
+ double y = Math.random();
+ return (x * x + y * y) < 1 ? 1L : 0L;
+ }
+ }
+
+
+ /**
+ * Simply sums up all long values.
+ */
+ public static final class SumReducer implements ReduceFunction<Long>{
+
+ @Override
+ public Long reduce(Long value1, Long value2) throws Exception {
+ return value1 + value2;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
new file mode 100644
index 0000000..7940310
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.ml;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.ml.util.LinearRegressionData;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+
+/**
+ * This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm.
+ *
+ * <p>
+ * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
+ * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
+ * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
+ * The algorithm terminates after a fixed number of iterations (as in this implementation)
+ * With enough iteration, the algorithm can minimize the cost function and find the best parameters
+ * This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>.
+ *
+ * <p>
+ * This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
+ * It find the best Theta parameter to fit the target.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
+ * Data points are separated by newline characters.<br>
+ * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
+ * </ul>
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> Bulk iterations
+ * <li> Broadcast variables in bulk iterations
+ * <li> Custom Java objects (PoJos)
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class LinearRegression {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception{
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input x data from elements
+ DataSet<Data> data = getDataSet(env);
+
+ // get the parameters from elements
+ DataSet<Params> parameters = getParamsDataSet(env);
+
+ // set number of bulk iterations for SGD linear Regression
+ IterativeDataSet<Params> loop = parameters.iterate(numIterations);
+
+ DataSet<Params> new_parameters = data
+ // compute a single step using every sample
+ .map(new SubUpdate()).withBroadcastSet(loop, "parameters")
+ // sum up all the steps
+ .reduce(new UpdateAccumulator())
+ // average the steps and update all parameters
+ .map(new Update());
+
+ // feed new parameters back into next iteration
+ DataSet<Params> result = loop.closeWith(new_parameters);
+
+ // emit result
+ if(fileOutput) {
+ result.writeAsText(outputPath);
+ } else {
+ result.print();
+ }
+
+ // execute program
+ env.execute("Linear Regression example");
+
+ }
+
+ // *************************************************************************
+ // DATA TYPES
+ // *************************************************************************
+
+ /**
+ * A simple data sample, x means the input, and y means the target.
+ */
+ public static class Data implements Serializable{
+ public double x,y;
+
+ public Data() {};
+
+ public Data(double x ,double y){
+ this.x = x;
+ this.y = y;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + x + "|" + y + ")";
+ }
+
+ }
+
+ /**
+ * A set of parameters -- theta0, theta1.
+ */
+ public static class Params implements Serializable{
+
+ private double theta0,theta1;
+
+ public Params(){};
+
+ public Params(double x0, double x1){
+ this.theta0 = x0;
+ this.theta1 = x1;
+ }
+
+ @Override
+ public String toString() {
+ return theta0 + " " + theta1;
+ }
+
+ public double getTheta0() {
+ return theta0;
+ }
+
+ public double getTheta1() {
+ return theta1;
+ }
+
+ public void setTheta0(double theta0) {
+ this.theta0 = theta0;
+ }
+
+ public void setTheta1(double theta1) {
+ this.theta1 = theta1;
+ }
+
+ public Params div(Integer a){
+ this.theta0 = theta0 / a ;
+ this.theta1 = theta1 / a ;
+ return this;
+ }
+
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /** Converts a Tuple2<Double,Double> into a Data. */
+ public static final class TupleDataConverter implements MapFunction<Tuple2<Double, Double>, Data> {
+
+ @Override
+ public Data map(Tuple2<Double, Double> t) throws Exception {
+ return new Data(t.f0, t.f1);
+ }
+ }
+
+ /** Converts a Tuple2<Double,Double> into a Params. */
+ public static final class TupleParamsConverter implements MapFunction<Tuple2<Double, Double>,Params> {
+
+ @Override
+ public Params map(Tuple2<Double, Double> t)throws Exception {
+ return new Params(t.f0,t.f1);
+ }
+ }
+
+ /**
+ * Compute a single BGD type update for every parameters.
+ */
+ public static class SubUpdate extends RichMapFunction<Data,Tuple2<Params,Integer>> {
+
+ private Collection<Params> parameters;
+
+ private Params parameter;
+
+ private int count = 1;
+
+ /** Reads the parameters from a broadcast variable into a collection. */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.parameters = getRuntimeContext().getBroadcastVariable("parameters");
+ }
+
+ @Override
+ public Tuple2<Params, Integer> map(Data in) throws Exception {
+
+ for(Params p : parameters){
+ this.parameter = p;
+ }
+
+ double theta_0 = parameter.theta0 - 0.01*((parameter.theta0 + (parameter.theta1*in.x)) - in.y);
+ double theta_1 = parameter.theta1 - 0.01*(((parameter.theta0 + (parameter.theta1*in.x)) - in.y) * in.x);
+
+ return new Tuple2<Params,Integer>(new Params(theta_0,theta_1),count);
+ }
+ }
+
+ /**
+ * Accumulator all the update.
+ * */
+ public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> {
+
+ @Override
+ public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) {
+
+ double new_theta0 = val1.f0.theta0 + val2.f0.theta0;
+ double new_theta1 = val1.f0.theta1 + val2.f0.theta1;
+ Params result = new Params(new_theta0,new_theta1);
+ return new Tuple2<Params, Integer>( result, val1.f1 + val2.f1);
+
+ }
+ }
+
+ /**
+ * Compute the final update by average them.
+ */
+ public static class Update implements MapFunction<Tuple2<Params, Integer>,Params> {
+
+ @Override
+ public Params map(Tuple2<Params, Integer> arg0) throws Exception {
+
+ return arg0.f0.div(arg0.f1);
+
+ }
+
+ }
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String dataPath = null;
+ private static String outputPath = null;
+ private static int numIterations = 10;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if(programArguments.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(programArguments.length == 3) {
+ dataPath = programArguments[0];
+ outputPath = programArguments[1];
+ numIterations = Integer.parseInt(programArguments[2]);
+ } else {
+ System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing Linear Regression example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" We provide a data generator to create synthetic input files for this program.");
+ System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Data> getDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ // read data from CSV file
+ return env.readCsvFile(dataPath)
+ .fieldDelimiter(' ')
+ .includeFields(true, true)
+ .types(Double.class, Double.class)
+ .map(new TupleDataConverter());
+ } else {
+ return LinearRegressionData.getDefaultDataDataSet(env);
+ }
+ }
+
+ private static DataSet<Params> getParamsDataSet(ExecutionEnvironment env) {
+
+ return LinearRegressionData.getDefaultParamsDataSet(env);
+
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
new file mode 100644
index 0000000..84d332f
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.ml.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.ml.LinearRegression.Data;
+import org.apache.flink.examples.java.ml.LinearRegression.Params;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Linear Regression example
+ * program. The default data sets are used, if no parameters are given to the
+ * program.
+ */
+public class LinearRegressionData {
+
+ // We have the data as object arrays so that we can also generate Scala Data
+ // Sources from it.
+ public static final Object[][] PARAMS = new Object[][] { new Object[] {
+ 0.0, 0.0 } };
+
+ public static final Object[][] DATA = new Object[][] {
+ new Object[] { 0.5, 1.0 }, new Object[] { 1.0, 2.0 },
+ new Object[] { 2.0, 4.0 }, new Object[] { 3.0, 6.0 },
+ new Object[] { 4.0, 8.0 }, new Object[] { 5.0, 10.0 },
+ new Object[] { 6.0, 12.0 }, new Object[] { 7.0, 14.0 },
+ new Object[] { 8.0, 16.0 }, new Object[] { 9.0, 18.0 },
+ new Object[] { 10.0, 20.0 }, new Object[] { -0.08, -0.16 },
+ new Object[] { 0.13, 0.26 }, new Object[] { -1.17, -2.35 },
+ new Object[] { 1.72, 3.45 }, new Object[] { 1.70, 3.41 },
+ new Object[] { 1.20, 2.41 }, new Object[] { -0.59, -1.18 },
+ new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 },
+ new Object[] { -0.55, -1.08 } };
+
+ public static DataSet<Params> getDefaultParamsDataSet(
+ ExecutionEnvironment env) {
+ List<Params> paramsList = new LinkedList<Params>();
+ for (Object[] params : PARAMS) {
+ paramsList.add(new Params((Double) params[0], (Double) params[1]));
+ }
+ return env.fromCollection(paramsList);
+ }
+
+ public static DataSet<Data> getDefaultDataDataSet(ExecutionEnvironment env) {
+
+ List<Data> dataList = new LinkedList<Data>();
+ for (Object[] data : DATA) {
+ dataList.add(new Data((Double) data[0], (Double) data[1]));
+ }
+ return env.fromCollection(dataList);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
new file mode 100644
index 0000000..148c607
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.ml.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.Locale;
+import java.util.Random;
+
+/**
+ * Generates data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program.
+ */
+public class LinearRegressionDataGenerator {
+
+ static {
+ Locale.setDefault(Locale.US);
+ }
+
+ private static final String POINTS_FILE = "data";
+ private static final long DEFAULT_SEED = 4650285087650871364L;
+ private static final int DIMENSIONALITY = 1;
+ private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
+ private static final char DELIMITER = ' ';
+
+ /**
+ * Main method to generate data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program.
+ * <p>
+ * The generator creates to files:
+ * <ul>
+ * <li><code>{tmp.dir}/data</code> for the data points
+ * </ul>
+ *
+ * @param args
+ * <ol>
+ * <li>Int: Number of data points
+ * <li><b>Optional</b> Long: Random seed
+ * </ol>
+ */
+ public static void main(String[] args) throws IOException {
+
+ // check parameter count
+ if (args.length < 1) {
+ System.out.println("LinearRegressionDataGenerator <numberOfDataPoints> [<seed>]");
+ System.exit(1);
+ }
+
+ // parse parameters
+ final int numDataPoints = Integer.parseInt(args[0]);
+ final long firstSeed = args.length > 1 ? Long.parseLong(args[4]) : DEFAULT_SEED;
+ final Random random = new Random(firstSeed);
+ final String tmpDir = System.getProperty("java.io.tmpdir");
+
+ // write the points out
+ BufferedWriter pointsOut = null;
+ try {
+ pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE)));
+ StringBuilder buffer = new StringBuilder();
+
+ // DIMENSIONALITY + 1 means that the number of x(dimensionality) and target y
+ double[] point = new double[DIMENSIONALITY+1];
+
+ for (int i = 1; i <= numDataPoints; i++) {
+ point[0] = random.nextGaussian();
+ point[1] = 2 * point[0] + 0.01*random.nextGaussian();
+ writePoint(point, buffer, pointsOut);
+ }
+
+ }
+ finally {
+ if (pointsOut != null) {
+ pointsOut.close();
+ }
+ }
+
+ System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE);
+ }
+
+
+ private static void writePoint(double[] data, StringBuilder buffer, BufferedWriter out) throws IOException {
+ buffer.setLength(0);
+
+ // write coordinates
+ for (int j = 0; j < data.length; j++) {
+ buffer.append(FORMAT.format(data[j]));
+ if(j < data.length - 1) {
+ buffer.append(DELIMITER);
+ }
+ }
+
+ out.write(buffer.toString());
+ out.newLine();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
new file mode 100644
index 0000000..6faafe0
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+
+/**
+ * This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
+ * column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at
+ * most contain whitespace characters like space and tab.
+ * <p>
+ * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
+ * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for configuration.
+ * <p>
+ * Usage: <code>FilterAndCountIncompleteLines [<input file path> [<result path>]]</code> <br>
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>custom accumulators
+ * <li>tuple data types
+ * <li>inline-defined functions
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class EmptyFieldsCountAccumulator {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get the data set
+ final DataSet<Tuple> file = getDataSet(env);
+
+ // filter lines with empty fields
+ final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter());
+
+ // Here, we could do further processing with the filtered lines...
+
+ // output the filtered lines
+ if (outputPath == null) {
+ filteredLines.print();
+ } else {
+ filteredLines.writeAsCsv(outputPath);
+ }
+
+ // execute program
+ final JobExecutionResult result = env.execute("Accumulator example");
+
+ // get the accumulator result via its registration key
+ final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
+ System.out.format("Number of detected empty fields per column: %s\n", emptyFields);
+
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static String filePath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] programArguments) {
+
+ if (programArguments.length >= 3) {
+ System.err.println("Usage: FilterAndCountIncompleteLines [<input file path> [<result path>]]");
+ return false;
+ }
+
+ if (programArguments.length >= 1) {
+ filePath = programArguments[0];
+ if (programArguments.length == 2) {
+ outputPath = programArguments[1];
+ }
+ }
+
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static DataSet<Tuple> getDataSet(final ExecutionEnvironment env) {
+
+ DataSet<? extends Tuple> source;
+ if (filePath == null) {
+ source = env.fromCollection(getExampleInputTuples());
+
+ } else {
+ source = env
+ .readCsvFile(filePath)
+ .fieldDelimiter(';')
+ .types(String.class, String.class, String.class);
+
+ }
+
+ return (DataSet<Tuple>) source;
+ }
+
+ private static Collection<Tuple3<String, String, String>> getExampleInputTuples() {
+ Collection<Tuple3<String, String, String>> inputTuples = new ArrayList<Tuple3<String, String, String>>();
+ inputTuples.add(new Tuple3<String, String, String>("John", "Doe", "Foo Str."));
+ inputTuples.add(new Tuple3<String, String, String>("Joe", "Johnson", ""));
+ inputTuples.add(new Tuple3<String, String, String>(null, "Kate Morn", "Bar Blvd."));
+ inputTuples.add(new Tuple3<String, String, String>("Tim", "Rinny", ""));
+ inputTuples.add(new Tuple3<String, String, String>("Alicia", "Jackson", " "));
+ return inputTuples;
+ }
+
+ /**
+ * This function filters all incoming tuples that have one or more empty fields.
+ * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under
+ * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
+ */
+ public static final class EmptyFieldFilter extends RichFilterFunction<Tuple> {
+
+ // create a new accumulator in each filter function instance
+ // accumulators can be merged later on
+ private final VectorAccumulator emptyFieldCounter = new VectorAccumulator();
+
+ @Override
+ public void open(final Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ // register the accumulator instance
+ getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
+ this.emptyFieldCounter);
+ }
+
+ @Override
+ public boolean filter(final Tuple t) {
+ boolean containsEmptyFields = false;
+
+ // iterate over the tuple fields looking for empty ones
+ for (int pos = 0; pos < t.getArity(); pos++) {
+
+ final String field = t.getField(pos);
+ if (field == null || field.trim().isEmpty()) {
+ containsEmptyFields = true;
+
+ // if an empty field is encountered, update the
+ // accumulator
+ this.emptyFieldCounter.add(pos);
+ }
+ }
+
+ return !containsEmptyFields;
+ }
+ }
+
+ /**
+ * This accumulator lets you increase vector components distributedly. The {@link #add(Integer)} method lets you
+ * increase the <i>n</i>-th vector component by 1, whereat <i>n</i> is the methods parameter. The size of the vector
+ * is automatically managed.
+ */
+ public static class VectorAccumulator implements Accumulator<Integer, List<Integer>> {
+
+ /** Stores the accumulated vector components. */
+ private final List<Integer> resultVector = new ArrayList<Integer>();
+
+ /**
+ * Increases the result vector component at the specified position by 1.
+ */
+ @Override
+ public void add(final Integer position) {
+ updateResultVector(position, 1);
+ }
+
+ /**
+ * Increases the result vector component at the specified position by the specified delta.
+ */
+ private void updateResultVector(final int position, final int delta) {
+ // inflate the vector to contain the given position
+ while (this.resultVector.size() <= position) {
+ this.resultVector.add(0);
+ }
+
+ // increment the component value
+ final int component = this.resultVector.get(position);
+ this.resultVector.set(position, component + delta);
+ }
+
+ @Override
+ public List<Integer> getLocalValue() {
+ return this.resultVector;
+ }
+
+ @Override
+ public void resetLocal() {
+ // clear the result vector if the accumulator instance shall be reused
+ this.resultVector.clear();
+ }
+
+ @Override
+ public void merge(final Accumulator<Integer, List<Integer>> other) {
+ // merge two vector accumulators by adding their up their vector components
+ final List<Integer> otherVector = other.getLocalValue();
+ for (int index = 0; index < otherVector.size(); index++) {
+ updateResultVector(index, otherVector.get(index));
+ }
+ }
+
+ @Override
+ public void write(final DataOutputView out) throws IOException {
+ // binary serialization of the result vector:
+ // [number of components, component 0, component 1, ...]
+ out.writeInt(this.resultVector.size());
+ for (final Integer component : this.resultVector) {
+ out.writeInt(component);
+ }
+ }
+
+ @Override
+ public void read(final DataInputView in) throws IOException {
+ // binary deserialization of the result vector
+ final int size = in.readInt();
+ for (int numReadComponents = 0; numReadComponents < size; numReadComponents++) {
+ final int component = in.readInt();
+ this.resultVector.add(component);
+ }
+ }
+
+ }
+}