You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/30 11:53:20 UTC
[1/2] flink git commit: [FLINK-1633] [gelly] Added getTriplets()
method and test
Repository: flink
Updated Branches:
refs/heads/master c284745ee -> 01adab53a
[FLINK-1633] [gelly] Added getTriplets() method and test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f66ab3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f66ab3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f66ab3a
Branch: refs/heads/master
Commit: 2f66ab3a2e616c8189774dffe62a793797f03bd9
Parents: c284745
Author: andralungu <lu...@gmail.com>
Authored: Tue Mar 3 23:03:43 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Mon Mar 30 11:49:33 2015 +0200
----------------------------------------------------------------------
docs/gelly_guide.md | 3 +
.../main/java/org/apache/flink/graph/Graph.java | 33 ++-
.../java/org/apache/flink/graph/Triplet.java | 78 +++++++
.../graph/example/EuclideanGraphExample.java | 213 +++++++++++++++++++
.../graph/example/utils/EuclideanGraphData.java | 86 ++++++++
.../example/EuclideanGraphExampleITCase.java | 77 +++++++
.../test/operations/GraphOperationsITCase.java | 19 ++
7 files changed, 508 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 0884405..7031550 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -165,6 +165,9 @@ long numberOfVertices()
// get the number of edges
long numberOfEdges()
+// get a DataSet of Triplets<srcVertex, trgVertex, edge>
+DataSet<Triplet<K, VV, EV>> getTriplets()
+
{% endhighlight %}
[Back to top](#top)
http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 334d5d3..a73beaf 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -43,6 +43,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -325,6 +326,36 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
/**
+ * This method allows access to the graph's edge values along with its source and target vertex values.
+ *
+ * @return a triplet DataSet consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
+ */
+ public DataSet<Triplet<K, VV, EV>> getTriplets() {
+ return this.getVertices().join(this.getEdges()).where(0).equalTo(0)
+ .with(new FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>, Tuple4<K, K, VV, EV>>() {
+
+ @Override
+ public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector)
+ throws Exception {
+
+ collector.collect(new Tuple4<K, K, VV, EV>(edge.getSource(), edge.getTarget(), vertex.getValue(),
+ edge.getValue()));
+ }
+ })
+ .join(this.getVertices()).where(1).equalTo(0)
+ .with(new FlatJoinFunction<Tuple4<K, K, VV, EV>, Vertex<K, VV>, Triplet<K, VV, EV>>() {
+
+ @Override
+ public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
+ Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
+
+ collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
+ tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
+ }
+ });
+ }
+
+ /**
* Apply a function to the attribute of each vertex in the graph.
*
* @param mapper the map function to apply.
@@ -1348,4 +1379,4 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
new file mode 100644
index 0000000..a0ebb13
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+
+import java.io.Serializable;
+
+/**
+ * A Triplet stores and retrieves the edges along with their corresponding source and target vertices.
+ * Triplets can be obtained from the input graph via the {@link org.apache.flink.graph.Graph#getTriplets()} method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class Triplet <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+ extends Tuple5<K, K, VV, VV, EV> {
+
+ public Triplet() {}
+
+ /**
+ * Constructs a Triplet from a given source vertex, target vertex and edge
+ *
+ * @param srcVertex
+ * @param trgVertex
+ * @param edge
+ */
+ public Triplet(Vertex<K, VV> srcVertex, Vertex<K, VV> trgVertex, Edge<K, EV> edge) {
+ this.f0 = srcVertex.f0;
+ this.f2 = srcVertex.f1;
+ this.f1 = trgVertex.f0;
+ this.f3 = trgVertex.f1;
+ this.f4 = edge.f2;
+ }
+
+ /**
+ * Constructs a Triplet from its src vertex id, src target id, src vertex value,
+ * src target value and edge value respectively.
+ *
+ * @param srcId
+ * @param trgId
+ * @param srcVal
+ * @param trgVal
+ * @param edgeVal
+ */
+ public Triplet(K srcId, K trgId, VV srcVal, VV trgVal, EV edgeVal) {
+ super(srcId, trgId, srcVal, trgVal, edgeVal);
+ }
+
+ public Vertex<K, VV> getSrcVertex() {
+ return new Vertex<K, VV>(this.f0, this.f2);
+ }
+
+ public Vertex<K, VV> getTrgVertex() {
+ return new Vertex<K, VV>(this.f1, this.f3);
+ }
+
+ public Edge<K, EV> getEdge() {
+ return new Edge<K, EV>(this.f0, this.f1, this.f4);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
new file mode 100644
index 0000000..10ad629
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+
+import java.io.Serializable;
+
+/**
+ * Given a directed, unweighted graph, with vertex values representing points in a plan,
+ * return a weighted graph where the edge weights are equal to the Euclidean distance between the
+ * src and the trg vertex values.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li> Vertices are represented by their vertexIds and vertex values and are separated by newlines,
+ * the value being formed of two doubles separated by a comma.
+ * For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices
+ * <li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas.
+ * Edges themselves are separated by newlines.
+ * For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
+ * </ul>
+ * </p>
+ *
+ * Usage <code>EuclideanGraphExample <vertex path> <edge path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
+ */
+@SuppressWarnings("serial")
+public class EuclideanGraphExample implements ProgramDescription {
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
+
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+ Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+ // the edge value will be the Euclidean distance between its src and trg vertex
+ DataSet<Tuple3<Long, Long, Double>> resultedTriplets = graph.getTriplets()
+ .map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
+
+ @Override
+ public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
+ throws Exception {
+
+ Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
+ Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
+ Edge<Long, Double> edge = triplet.getEdge();
+
+ edge.setValue(srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
+
+ return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
+ edge.getValue());
+ }
+ });
+
+ Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(resultedTriplets,
+ new MapFunction<Tuple2<Double, Double>, Double>() {
+
+ @Override
+ public Double map(Tuple2<Double, Double> distance) throws Exception {
+ return distance.f1;
+ }
+ });
+
+ // retrieve the edges from the final result
+ DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
+
+ // emit result
+ if (fileOutput) {
+ result.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ result.print();
+ }
+
+ env.execute("Euclidean Graph Example");
+ }
+
+ @Override
+ public String getDescription() {
+ return "Weighing a graph by computing the Euclidean distance " +
+ "between its vertices";
+ }
+
+ // *************************************************************************
+ // DATA TYPES
+ // *************************************************************************
+
+ /**
+ * A simple two-dimensional point.
+ */
+ public static class Point implements Serializable {
+
+ public double x, y;
+
+ public Point() {}
+
+ public Point(double x, double y) {
+ this.x = x;
+ this.y = y;
+ }
+
+ public double euclideanDistance(Point other) {
+ return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+ }
+
+ @Override
+ public String toString() {
+ return x + " " + y;
+ }
+ }
+
+ // ******************************************************************************************************************
+ // UTIL METHODS
+ // ******************************************************************************************************************
+
+ private static boolean fileOutput = false;
+
+ private static String verticesInputPath = null;
+
+ private static String edgesInputPath = null;
+
+ private static String outputPath = null;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ if (args.length == 3) {
+ fileOutput = true;
+ verticesInputPath = args[0];
+ edgesInputPath = args[1];
+ outputPath = args[2];
+ } else {
+ System.out.println("Executing Euclidean Graph example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read input data from files.");
+ System.out.println("See the documentation for the correct format of input files.");
+ System.err.println("Usage: EuclideanGraphExample <input vertices path> <input edges path>" +
+ " <output path>");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(verticesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Double.class, Double.class)
+ .map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
+
+ @Override
+ public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
+ return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2));
+ }
+ });
+ } else {
+ return EuclideanGraphData.getDefaultVertexDataSet(env);
+ }
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+
+ @Override
+ public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+ return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0);
+ }
+ });
+ } else {
+ return EuclideanGraphData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
new file mode 100644
index 0000000..0d13f72
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.EuclideanGraphExample;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Euclidean Graph example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class EuclideanGraphData {
+
+ public static final int NUM_VERTICES = 9;
+
+ public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + "3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" +
+ "6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + "9,9.0,9.0";
+
+ public static DataSet<Vertex<Long, EuclideanGraphExample.Point>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+ List<Vertex<Long, EuclideanGraphExample.Point>> vertices = new ArrayList<Vertex<Long, EuclideanGraphExample.Point>>();
+ for(int i=1; i<=NUM_VERTICES; i++) {
+ vertices.add(new Vertex<Long, EuclideanGraphExample.Point>(new Long(i),
+ new EuclideanGraphExample.Point(new Double(i), new Double(i))));
+ }
+
+ return env.fromCollection(vertices);
+ }
+
+ public static final String EDGES = "1,2\n" + "1,4\n" + "2,3\n" + "2,4\n" + "2,5\n" +
+ "3,5\n" + "4,5\n" + "4,6\n" + "5,7\n" + "5,9\n" + "6,7\n" + "6,8\n" +
+ "7,8\n" + "7,9\n" + "8,9";
+
+ public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 2L, 0.0));
+ edges.add(new Edge<Long, Double>(1L, 4L, 0.0));
+ edges.add(new Edge<Long, Double>(2L, 3L, 0.0));
+ edges.add(new Edge<Long, Double>(2L, 4L, 0.0));
+ edges.add(new Edge<Long, Double>(2L, 5L, 0.0));
+ edges.add(new Edge<Long, Double>(3L, 5L, 0.0));
+ edges.add(new Edge<Long, Double>(4L, 5L, 0.0));
+ edges.add(new Edge<Long, Double>(4L, 6L, 0.0));
+ edges.add(new Edge<Long, Double>(5L, 7L, 0.0));
+ edges.add(new Edge<Long, Double>(5L, 9L, 0.0));
+ edges.add(new Edge<Long, Double>(6L, 7L, 0.0));
+ edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+ edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+ edges.add(new Edge<Long, Double>(7L, 8L, 0.0));
+ edges.add(new Edge<Long, Double>(7L, 9L, 0.0));
+ edges.add(new Edge<Long, Double>(8L, 9L, 0.0));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final String RESULTED_WEIGHTED_EDGES = "1,2,1.4142135623730951\n" + "1,4,4.242640687119285\n" +
+ "2,3,1.4142135623730951\n" + "2,4,2.8284271247461903\n" + "2,5,4.242640687119285\n" + "3,5,2.8284271247461903\n" +
+ "4,5,1.4142135623730951\n" + "4,6,2.8284271247461903\n" + "5,7,2.8284271247461903\n" + "5,9,5.656854249492381\n" +
+ "6,7,1.4142135623730951\n" + "6,8,2.8284271247461903\n" + "7,8,1.4142135623730951\n" + "7,9,2.8284271247461903\n" +
+ "8,9,1.4142135623730951";
+
+ private EuclideanGraphData() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
new file mode 100644
index 0000000..fa1c246
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.EuclideanGraphExample;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class EuclideanGraphExampleITCase extends MultipleProgramsTestBase {
+
+ private String verticesPath;
+
+ private String edgesPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public EuclideanGraphExampleITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+ File verticesFile = tempFolder.newFile();
+ Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8);
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8);
+
+ verticesPath = verticesFile.toURI().toString();
+ edgesPath = edgesFile.toURI().toString();
+ }
+
+ @Test
+ public void testGraphWeightingExanple() throws Exception {
+ EuclideanGraphExample.main(new String[]{verticesPath, edgesPath, resultPath});
+ expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f66ab3a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index 6210f43..1b9d5ac 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -265,4 +265,23 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
"5,1,51\n" +
"6,1,61\n";
}
+
+ @Test
+ public void testTriplets() throws Exception {
+ /*
+ * Test getTriplets()
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ graph.getTriplets().writeAsCsv(resultPath);
+
+ env.execute();
+ expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" +
+ "2,3,2,3,23\n" + "3,4,3,4,34\n" +
+ "3,5,3,5,35\n" + "4,5,4,5,45\n" +
+ "5,1,5,1,51\n";
+ }
}
\ No newline at end of file
[2/2] flink git commit: [FLINK-1633] [gelly] some cosmetic changes to
getTriplets and EuclideanGraphExample
Posted by va...@apache.org.
[FLINK-1633] [gelly] some cosmetic changes to getTriplets and EuclideanGraphExample
This closes #452
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01adab53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01adab53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01adab53
Branch: refs/heads/master
Commit: 01adab53a53ab79a95ab14b4f86e94fd78c0fa40
Parents: 2f66ab3
Author: vasia <va...@gmail.com>
Authored: Sun Mar 29 19:52:44 2015 +0200
Committer: Vasia Kalavri <va...@apache.org>
Committed: Mon Mar 30 11:50:54 2015 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/flink/graph/Triplet.java | 2 ++
.../apache/flink/graph/example/EuclideanGraphExample.java | 9 +++------
2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/01adab53/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
index a0ebb13..b85987d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -33,6 +33,8 @@ import java.io.Serializable;
public class Triplet <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
extends Tuple5<K, K, VV, VV, EV> {
+ private static final long serialVersionUID = 1L;
+
public Triplet() {}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/01adab53/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
index 10ad629..fa08084 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
@@ -71,7 +71,7 @@ public class EuclideanGraphExample implements ProgramDescription {
Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
// the edge value will be the Euclidean distance between its src and trg vertex
- DataSet<Tuple3<Long, Long, Double>> resultedTriplets = graph.getTriplets()
+ DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
@Override
@@ -80,16 +80,13 @@ public class EuclideanGraphExample implements ProgramDescription {
Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
- Edge<Long, Double> edge = triplet.getEdge();
-
- edge.setValue(srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
- edge.getValue());
+ srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
}
});
- Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(resultedTriplets,
+ Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
new MapFunction<Tuple2<Double, Double>, Double>() {
@Override