You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/30 11:53:20 UTC

[1/2] flink git commit: [FLINK-1633] [gelly] Added getTriplets() method and test

Repository: flink
Updated Branches:
  refs/heads/master c284745ee -> 01adab53a


[FLINK-1633] [gelly] Added getTriplets() method and test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f66ab3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f66ab3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f66ab3a

Branch: refs/heads/master
Commit: 2f66ab3a2e616c8189774dffe62a793797f03bd9
Parents: c284745
Author: andralungu <lu...@gmail.com>
Authored: Tue Mar 3 23:03:43 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Mon Mar 30 11:49:33 2015 +0200

----------------------------------------------------------------------
 docs/gelly_guide.md                             |   3 +
 .../main/java/org/apache/flink/graph/Graph.java |  33 ++-
 .../java/org/apache/flink/graph/Triplet.java    |  78 +++++++
 .../graph/example/EuclideanGraphExample.java    | 213 +++++++++++++++++++
 .../graph/example/utils/EuclideanGraphData.java |  86 ++++++++
 .../example/EuclideanGraphExampleITCase.java    |  77 +++++++
 .../test/operations/GraphOperationsITCase.java  |  19 ++
 7 files changed, 508 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 0884405..7031550 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -165,6 +165,9 @@ long numberOfVertices()
 // get the number of edges
 long numberOfEdges()
 
+// get a DataSet of Triplets<srcVertex, trgVertex, edge>
+DataSet<Triplet<K, VV, EV>> getTriplets()
+
 {% endhighlight %}
 
 [Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 334d5d3..a73beaf 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -43,6 +43,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -325,6 +326,36 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	/**
+	 * This method allows access to the graph's edge values along with its source and target vertex values.
+	 *
+	 * @return a triplet DataSet consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
+	 */
+	public DataSet<Triplet<K, VV, EV>> getTriplets() {
+		return this.getVertices().join(this.getEdges()).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>, Tuple4<K, K, VV, EV>>() {
+
+					@Override
+					public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector)
+							throws Exception {
+
+						collector.collect(new Tuple4<K, K, VV, EV>(edge.getSource(), edge.getTarget(), vertex.getValue(),
+								edge.getValue()));
+					}
+				})
+				.join(this.getVertices()).where(1).equalTo(0)
+				.with(new FlatJoinFunction<Tuple4<K, K, VV, EV>, Vertex<K, VV>, Triplet<K, VV, EV>>() {
+
+					@Override
+					public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
+									Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
+
+						collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
+								tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
+					}
+				});
+	}
+
+	/**
 	 * Apply a function to the attribute of each vertex in the graph.
 	 * 
 	 * @param mapper the map function to apply.
@@ -1348,4 +1379,4 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,	function.getClass(), 3, null, null);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
new file mode 100644
index 0000000..a0ebb13
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+
+import java.io.Serializable;
+
+/**
+ * A Triplet stores and retrieves the edges along with their corresponding source and target vertices.
+ * Triplets can be obtained from the input graph via the {@link org.apache.flink.graph.Graph#getTriplets()} method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class Triplet <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+		extends Tuple5<K, K, VV, VV, EV> {
+
+	public Triplet() {}
+
+	/**
+	 * Constructs a Triplet from a given source vertex, target vertex and edge
+	 *
+	 * @param srcVertex
+	 * @param trgVertex
+	 * @param edge
+	 */
+	public Triplet(Vertex<K, VV> srcVertex, Vertex<K, VV> trgVertex, Edge<K, EV> edge) {
+		this.f0 = srcVertex.f0;
+		this.f2 = srcVertex.f1;
+		this.f1 = trgVertex.f0;
+		this.f3 = trgVertex.f1;
+		this.f4 = edge.f2;
+	}
+
+	/**
+	 * Constructs a Triplet from its src vertex id, src target id, src vertex value,
+	 * src target value and edge value respectively.
+	 *
+	 * @param srcId
+	 * @param trgId
+	 * @param srcVal
+	 * @param trgVal
+	 * @param edgeVal
+	 */
+	public Triplet(K srcId, K trgId, VV srcVal, VV trgVal, EV edgeVal) {
+		super(srcId, trgId, srcVal, trgVal, edgeVal);
+	}
+
+	public Vertex<K, VV> getSrcVertex() {
+		return new Vertex<K, VV>(this.f0, this.f2);
+	}
+
+	public Vertex<K, VV> getTrgVertex() {
+		return new Vertex<K, VV>(this.f1, this.f3);
+	}
+
+	public Edge<K, EV> getEdge() {
+		return new Edge<K, EV>(this.f0, this.f1, this.f4);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
new file mode 100644
index 0000000..10ad629
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+
+import java.io.Serializable;
+
+/**
+ * Given a directed, unweighted graph, with vertex values representing points in a plan,
+ * return a weighted graph where the edge weights are equal to the Euclidean distance between the
+ * src and the trg vertex values.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * 	<li> Vertices are represented by their vertexIds and vertex values and are separated by newlines,
+ * 	the value being formed of two doubles separated by a comma.
+ * 	For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices
+ * 	<li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas.
+ * 	Edges themselves are separated by newlines.
+ * 	For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
+ * </ul>
+ * </p>
+ *
+ * Usage <code>EuclideanGraphExample &lt;vertex path&gt; &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
+ */
+@SuppressWarnings("serial")
+public class EuclideanGraphExample implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+		// the edge value will be the Euclidean distance between its src and trg vertex
+		DataSet<Tuple3<Long, Long, Double>> resultedTriplets = graph.getTriplets()
+				.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
+
+					@Override
+					public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
+							throws Exception {
+
+						Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
+						Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
+						Edge<Long, Double> edge = triplet.getEdge();
+
+						edge.setValue(srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
+
+						return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
+								edge.getValue());
+					}
+				});
+
+		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(resultedTriplets,
+				new MapFunction<Tuple2<Double, Double>, Double>() {
+
+					@Override
+					public Double map(Tuple2<Double, Double> distance) throws Exception {
+						return distance.f1;
+					}
+				});
+
+		// retrieve the edges from the final result
+		DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
+
+		// emit result
+		if (fileOutput) {
+			result.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			result.print();
+		}
+
+		env.execute("Euclidean Graph Example");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Weighing a graph by computing the Euclidean distance " +
+				"between its vertices";
+	}
+
+	// *************************************************************************
+	//     DATA TYPES
+	// *************************************************************************
+
+	/**
+	 * A simple two-dimensional point.
+	 */
+	public static class Point implements Serializable {
+
+		public double x, y;
+
+		public Point() {}
+
+		public Point(double x, double y) {
+			this.x = x;
+			this.y = y;
+		}
+
+		public double euclideanDistance(Point other) {
+			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+		}
+
+		@Override
+		public String toString() {
+			return x + " " + y;
+		}
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String verticesInputPath = null;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if (args.length == 3) {
+				fileOutput = true;
+				verticesInputPath = args[0];
+				edgesInputPath = args[1];
+				outputPath = args[2];
+			} else {
+				System.out.println("Executing Euclidean Graph example with default parameters and built-in default data.");
+				System.out.println("Provide parameters to read input data from files.");
+				System.out.println("See the documentation for the correct format of input files.");
+				System.err.println("Usage: EuclideanGraphExample <input vertices path> <input edges path>" +
+						" <output path>");
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(verticesInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Double.class, Double.class)
+					.map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
+
+						@Override
+						public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
+							return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2));
+						}
+					});
+		} else {
+			return EuclideanGraphData.getDefaultVertexDataSet(env);
+		}
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+
+						@Override
+						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0);
+						}
+					});
+		} else {
+			return EuclideanGraphData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
new file mode 100644
index 0000000..0d13f72
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.EuclideanGraphExample;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Euclidean Graph example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class EuclideanGraphData {
+
+	public static final int NUM_VERTICES = 9;
+
+	public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + "3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" +
+			"6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + "9,9.0,9.0";
+
+	public static DataSet<Vertex<Long, EuclideanGraphExample.Point>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, EuclideanGraphExample.Point>> vertices = new ArrayList<Vertex<Long, EuclideanGraphExample.Point>>();
+		for(int i=1; i<=NUM_VERTICES; i++) {
+			vertices.add(new Vertex<Long, EuclideanGraphExample.Point>(new Long(i),
+					new EuclideanGraphExample.Point(new Double(i), new Double(i))));
+		}
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final String EDGES = "1,2\n" + "1,4\n" + "2,3\n" + "2,4\n" + "2,5\n" +
+			"3,5\n" + "4,5\n" + "4,6\n" + "5,7\n" + "5,9\n" + "6,7\n" + "6,8\n" +
+			"7,8\n" + "7,9\n" +  "8,9";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 0.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(4L, 6L, 0.0));
+		edges.add(new Edge<Long, Double>(5L, 7L, 0.0));
+		edges.add(new Edge<Long, Double>(5L, 9L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 7L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(7L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(7L, 9L, 0.0));
+		edges.add(new Edge<Long, Double>(8L, 9L, 0.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String RESULTED_WEIGHTED_EDGES = "1,2,1.4142135623730951\n" + "1,4,4.242640687119285\n" +
+			"2,3,1.4142135623730951\n" + "2,4,2.8284271247461903\n" + "2,5,4.242640687119285\n" + "3,5,2.8284271247461903\n" +
+			"4,5,1.4142135623730951\n" + "4,6,2.8284271247461903\n" + "5,7,2.8284271247461903\n" + "5,9,5.656854249492381\n" +
+			"6,7,1.4142135623730951\n" + "6,8,2.8284271247461903\n" + "7,8,1.4142135623730951\n" + "7,9,2.8284271247461903\n" +
+			"8,9,1.4142135623730951";
+
+	private EuclideanGraphData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
new file mode 100644
index 0000000..fa1c246
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.EuclideanGraphExample;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class EuclideanGraphExampleITCase extends MultipleProgramsTestBase {
+
+	private String verticesPath;
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public EuclideanGraphExampleITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testGraphWeightingExanple() throws Exception {
+		EuclideanGraphExample.main(new String[]{verticesPath, edgesPath, resultPath});
+		expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index 6210f43..1b9d5ac 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -265,4 +265,23 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 					"5,1,51\n" +
 					"6,1,61\n";
 	}
+
+	@Test
+	public void testTriplets() throws Exception {
+		/*
+		 * Test getTriplets()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		graph.getTriplets().writeAsCsv(resultPath);
+
+		env.execute();
+		expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" +
+				"2,3,2,3,23\n" + "3,4,3,4,34\n" +
+				"3,5,3,5,35\n" + "4,5,4,5,45\n" +
+				"5,1,5,1,51\n";
+	}
 }
\ No newline at end of file


[2/2] flink git commit: [FLINK-1633] [gelly] some cosmetic changes to getTriplets and EuclideanGraphExample

Posted by va...@apache.org.
[FLINK-1633] [gelly] some cosmetic changes to getTriplets and EuclideanGraphExample

This closes #452


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01adab53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01adab53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01adab53

Branch: refs/heads/master
Commit: 01adab53a53ab79a95ab14b4f86e94fd78c0fa40
Parents: 2f66ab3
Author: vasia <va...@gmail.com>
Authored: Sun Mar 29 19:52:44 2015 +0200
Committer: Vasia Kalavri <va...@apache.org>
Committed: Mon Mar 30 11:50:54 2015 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/graph/Triplet.java       | 2 ++
 .../apache/flink/graph/example/EuclideanGraphExample.java   | 9 +++------
 2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01adab53/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
index a0ebb13..b85987d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -33,6 +33,8 @@ import java.io.Serializable;
 public class Triplet <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
 		extends Tuple5<K, K, VV, VV, EV> {
 
+	private static final long serialVersionUID = 1L;
+
 	public Triplet() {}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/01adab53/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
index 10ad629..fa08084 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
@@ -71,7 +71,7 @@ public class EuclideanGraphExample implements ProgramDescription {
 		Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
 
 		// the edge value will be the Euclidean distance between its src and trg vertex
-		DataSet<Tuple3<Long, Long, Double>> resultedTriplets = graph.getTriplets()
+		DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
 				.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
 
 					@Override
@@ -80,16 +80,13 @@ public class EuclideanGraphExample implements ProgramDescription {
 
 						Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
 						Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
-						Edge<Long, Double> edge = triplet.getEdge();
-
-						edge.setValue(srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
 
 						return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
-								edge.getValue());
+								srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
 					}
 				});
 
-		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(resultedTriplets,
+		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
 				new MapFunction<Tuple2<Double, Double>, Double>() {
 
 					@Override