You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:07 UTC
[4/9] flink git commit: [FLINK-1514] [gelly] Fixed inconsistencies
after merge
[FLINK-1514] [gelly] Fixed inconsistencies after merge
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/837508df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/837508df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/837508df
Branch: refs/heads/master
Commit: 837508df876c77cf22d7b289e76547df6a485bf0
Parents: 740d437
Author: Dániel Bali <ba...@gmail.com>
Authored: Sun Apr 19 21:17:03 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:24 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-gelly/pom.xml | 31 -------------
.../main/java/org/apache/flink/graph/Graph.java | 12 +----
.../example/GSAConnectedComponentsExample.java | 10 ++---
.../GSASingleSourceShortestPathsExample.java | 4 +-
.../apache/flink/graph/gsa/GatherFunction.java | 2 +-
.../graph/gsa/GatherSumApplyIteration.java | 16 +++----
.../org/apache/flink/graph/gsa/Neighbor.java | 47 ++++++++++++++++++++
.../org/apache/flink/graph/gsa/RichEdge.java | 47 --------------------
8 files changed, 65 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100644
new mode 100755
index 5245667..a36ab4b
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -57,35 +57,4 @@ under the License.
<version>${guava.version}</version>
</dependency>
</dependencies>
-
- <!-- See main pom.xml for explanation of profiles -->
- <profiles>
- <profile>
- <id>hadoop-1</id>
- <activation>
- <property>
- <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
- <!--hadoop1--><name>hadoop.profile</name><value>1</value>
- </property>
- </activation>
- <dependencies>
- <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- </profile>
- <profile>
- <id>hadoop-2</id>
- <activation>
- <property>
- <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
- <!--hadoop2--><name>!hadoop.profile</name>
- </property>
- </activation>
- </profile>
- </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index d564f24..330c951 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -83,8 +83,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
private final DataSet<Edge<K, EV>> edges;
/**
- * Creates a graph from two DataSets: vertices and edges and allow setting
- * the undirected property
+ * Creates a graph from two DataSets: vertices and edges
*
* @param vertices a DataSet of vertices.
* @param edges a DataSet of edges.
@@ -919,7 +918,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
/**
- * @return Singleton DataSet containing the edge count
+ * @return a long integer representing the number of edges
*/
public long numberOfEdges() throws Exception {
return edges.count();
@@ -1016,13 +1015,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
}
- private static final class CheckIfOneComponentMapper implements MapFunction<Integer, Boolean> {
- @Override
- public Boolean map(Integer n) {
- return (n == 1);
- }
- }
-
/**
* Adds the input vertex and edges to the graph. If the vertex already
* exists in the graph, it will not be added again, but the given edges
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index d338b03..6a2c250 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
@@ -70,13 +70,13 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
// Extract the vertices as the result
- DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
+ DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
// emit result
if (fileOutput) {
- greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+ connectedComponents.writeAsCsv(outputPath, "\n", " ");
} else {
- greedyGraphColoring.print();
+ connectedComponents.print();
}
env.execute("GSA Connected Components");
@@ -99,7 +99,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
private static final class ConnectedComponentsGather
extends GatherFunction<Long, NullValue, Long> {
@Override
- public Long gather(RichEdge<Long, NullValue> richEdge) {
+ public Long gather(Neighbor<Long, NullValue> richEdge) {
return richEdge.getSrcVertexValue();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 8967a90..cc3b054 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.util.Collector;
/**
@@ -116,7 +116,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
private static final class SingleSourceShortestPathGather
extends GatherFunction<Double, Double, Double> {
@Override
- public Double gather(RichEdge<Double, Double> richEdge) {
+ public Double gather(Neighbor<Double, Double> richEdge) {
return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 91a468d..0d110d5 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
- public abstract M gather(RichEdge<VV, EV> richEdge);
+ public abstract M gather(Neighbor<VV, EV> neighbor);
/**
* This method is executed once per superstep before the vertex update function is invoked for each vertex.
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 1adab29..67ae094 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -117,15 +117,15 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
- // Prepare the rich edges
- DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> richEdges = iteration
+ // Prepare the neighbors
+ DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors = iteration
.getWorkset()
.join(edgeDataSet)
.where(0)
.equalTo(0);
// Gather, sum and apply
- DataSet<Tuple2<K, M>> gatheredSet = richEdges.map(gatherUdf);
+ DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
DataSet<Vertex<K, VV>> appliedSet = summedSet
.join(iteration.getSolutionSet())
@@ -178,12 +178,12 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
}
@Override
- public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> richEdge) throws Exception {
- RichEdge<VV, EV> userRichEdge = new RichEdge<VV, EV>(richEdge.f0.getValue(),
- richEdge.f1.getValue());
+ public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> neighborTuple) throws Exception {
+ Neighbor<VV, EV> neighbor = new Neighbor<VV, EV>(neighborTuple.f0.getValue(),
+ neighborTuple.f1.getValue());
- K key = richEdge.f1.getTarget();
- M result = this.gatherFunction.gather(userRichEdge);
+ K key = neighborTuple.f1.getTarget();
+ M result = this.gatherFunction.gather(neighbor);
return new Tuple2<K, M>(key, result);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
new file mode 100755
index 0000000..2260022
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+
+/**
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class Neighbor<VV extends Serializable, EV extends Serializable>
+ extends Tuple2<VV, EV> {
+
+ public Neighbor() {}
+
+ public Neighbor(VV src, EV edge) {
+ super(src, edge);
+ }
+
+ public VV getSrcVertexValue() {
+ return this.f0;
+ }
+
+ public EV getEdgeValue() {
+ return this.f1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
deleted file mode 100755
index 8d4b4d8..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.io.Serializable;
-
-/**
- * This class represents a <sourceVertex, edge> pair
- * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- */
-public class RichEdge<VV extends Serializable, EV extends Serializable>
- extends Tuple2<VV, EV> {
-
- public RichEdge() {}
-
- public RichEdge(VV src, EV edge) {
- super(src, edge);
- }
-
- public VV getSrcVertexValue() {
- return this.f0;
- }
-
- public EV getEdgeValue() {
- return this.f1;
- }
-}