You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:07 UTC

[4/9] flink git commit: [FLINK-1514] [gelly] Fixed inconsistencies after merge

[FLINK-1514] [gelly] Fixed inconsistencies after merge


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/837508df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/837508df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/837508df

Branch: refs/heads/master
Commit: 837508df876c77cf22d7b289e76547df6a485bf0
Parents: 740d437
Author: Dániel Bali <ba...@gmail.com>
Authored: Sun Apr 19 21:17:03 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:24 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-gelly/pom.xml               | 31 -------------
 .../main/java/org/apache/flink/graph/Graph.java | 12 +----
 .../example/GSAConnectedComponentsExample.java  | 10 ++---
 .../GSASingleSourceShortestPathsExample.java    |  4 +-
 .../apache/flink/graph/gsa/GatherFunction.java  |  2 +-
 .../graph/gsa/GatherSumApplyIteration.java      | 16 +++----
 .../org/apache/flink/graph/gsa/Neighbor.java    | 47 ++++++++++++++++++++
 .../org/apache/flink/graph/gsa/RichEdge.java    | 47 --------------------
 8 files changed, 65 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100644
new mode 100755
index 5245667..a36ab4b
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -57,35 +57,4 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 	</dependencies>
-
-    <!-- See main pom.xml for explanation of profiles -->
-    <profiles>
-        <profile>
-            <id>hadoop-1</id>
-            <activation>
-                <property>
-                    <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
-                    <!--hadoop1--><name>hadoop.profile</name><value>1</value>
-                </property>
-            </activation>
-            <dependencies>
-                <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
-                <dependency>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                    <version>${guava.version}</version>
-                    <scope>provided</scope>
-                </dependency>
-            </dependencies>
-        </profile>
-        <profile>
-            <id>hadoop-2</id>
-            <activation>
-                <property>
-                    <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
-                    <!--hadoop2--><name>!hadoop.profile</name>
-                </property>
-            </activation>
-        </profile>
-    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index d564f24..330c951 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -83,8 +83,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	private final DataSet<Edge<K, EV>> edges;
 
 	/**
-	 * Creates a graph from two DataSets: vertices and edges and allow setting
-	 * the undirected property
+	 * Creates a graph from two DataSets: vertices and edges
 	 * 
 	 * @param vertices a DataSet of vertices.
 	 * @param edges a DataSet of edges.
@@ -919,7 +918,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	/**
-	 * @return Singleton DataSet containing the edge count
+	 * @return a long integer representing the number of edges
 	 */
 	public long numberOfEdges() throws Exception {
 		return edges.count();
@@ -1016,13 +1015,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class CheckIfOneComponentMapper implements	MapFunction<Integer, Boolean> {
-		@Override
-		public Boolean map(Integer n) {
-			return (n == 1);
-		}
-	}
-
 	/**
 	 * Adds the input vertex and edges to the graph. If the vertex already
 	 * exists in the graph, it will not be added again, but the given edges

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index d338b03..6a2c250 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
@@ -70,13 +70,13 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 				graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
 
 		// Extract the vertices as the result
-		DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
+		DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
 
 		// emit result
 		if (fileOutput) {
-			greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+			connectedComponents.writeAsCsv(outputPath, "\n", " ");
 		} else {
-			greedyGraphColoring.print();
+			connectedComponents.print();
 		}
 
 		env.execute("GSA Connected Components");
@@ -99,7 +99,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	private static final class ConnectedComponentsGather
 			extends GatherFunction<Long, NullValue, Long> {
 		@Override
-		public Long gather(RichEdge<Long, NullValue> richEdge) {
+		public Long gather(Neighbor<Long, NullValue> richEdge) {
 
 			return richEdge.getSrcVertexValue();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 8967a90..cc3b054 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.util.Collector;
 
 /**
@@ -116,7 +116,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	private static final class SingleSourceShortestPathGather
 			extends GatherFunction<Double, Double, Double> {
 		@Override
-		public Double gather(RichEdge<Double, Double> richEdge) {
+		public Double gather(Neighbor<Double, Double> richEdge) {
 			return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
 		}
 	};

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 91a468d..0d110d5 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
 
 public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
 
-	public abstract M gather(RichEdge<VV, EV> richEdge);
+	public abstract M gather(Neighbor<VV, EV> neighbor);
 
 	/**
 	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 1adab29..67ae094 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -117,15 +117,15 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
 				vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
 
-		// Prepare the rich edges
-		DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> richEdges = iteration
+		// Prepare the neighbors
+		DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors = iteration
 				.getWorkset()
 				.join(edgeDataSet)
 				.where(0)
 				.equalTo(0);
 
 		// Gather, sum and apply
-		DataSet<Tuple2<K, M>> gatheredSet = richEdges.map(gatherUdf);
+		DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
 		DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
 		DataSet<Vertex<K, VV>> appliedSet = summedSet
 				.join(iteration.getSolutionSet())
@@ -178,12 +178,12 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		}
 
 		@Override
-		public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> richEdge) throws Exception {
-			RichEdge<VV, EV> userRichEdge = new RichEdge<VV, EV>(richEdge.f0.getValue(),
-					richEdge.f1.getValue());
+		public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> neighborTuple) throws Exception {
+			Neighbor<VV, EV> neighbor = new Neighbor<VV, EV>(neighborTuple.f0.getValue(),
+					neighborTuple.f1.getValue());
 
-			K key = richEdge.f1.getTarget();
-			M result = this.gatherFunction.gather(userRichEdge);
+			K key = neighborTuple.f1.getTarget();
+			M result = this.gatherFunction.gather(neighbor);
 			return new Tuple2<K, M>(key, result);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
new file mode 100755
index 0000000..2260022
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+
+/**
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class Neighbor<VV extends Serializable, EV extends Serializable>
+		extends Tuple2<VV, EV> {
+
+	public Neighbor() {}
+
+	public Neighbor(VV src, EV edge) {
+		super(src, edge);
+	}
+
+	public VV getSrcVertexValue() {
+		return this.f0;
+	}
+
+	public EV getEdgeValue() {
+		return this.f1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
deleted file mode 100755
index 8d4b4d8..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.io.Serializable;
-
-/**
- * This class represents a <sourceVertex, edge> pair
- * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- */
-public class RichEdge<VV extends Serializable, EV extends Serializable>
-		extends Tuple2<VV, EV> {
-
-	public RichEdge() {}
-
-	public RichEdge(VV src, EV edge) {
-		super(src, edge);
-	}
-
-	public VV getSrcVertexValue() {
-		return this.f0;
-	}
-
-	public EV getEdgeValue() {
-		return this.f1;
-	}
-}