You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/04/26 15:08:19 UTC
flink git commit: [FLINK-3799] [gelly] Graph checksum should execute
single job
Repository: flink
Updated Branches:
refs/heads/master a58cd80f9 -> 06bf4bf3f
[FLINK-3799] [gelly] Graph checksum should execute single job
This closes #1922
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06bf4bf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06bf4bf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06bf4bf3
Branch: refs/heads/master
Commit: 06bf4bf3f465b1c019b40dfd7c1662507d7fa2e3
Parents: a58cd80
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Apr 21 08:36:04 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Apr 26 09:07:22 2016 -0400
----------------------------------------------------------------------
.../flink/graph/scala/utils/package.scala | 32 +++++++++++++-------
.../apache/flink/graph/utils/GraphUtils.java | 29 +++++++++++++-----
2 files changed, 43 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/06bf4bf3/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
index 954e6c3..fe4ab5b 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
@@ -19,9 +19,9 @@
package org.apache.flink.graph.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.Utils.ChecksumHashCode
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.java.Utils
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.util.AbstractID
import scala.reflect.ClassTag
@@ -35,16 +35,26 @@ package object utils {
TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {
/**
- * Computes the ChecksumHashCode over the Graph.
- *
- * @return the ChecksumHashCode over the vertices and edges.
- */
+ * Convenience method to get the count (number of elements) of a Graph
+ * as well as the checksum (sum over element hashes). The vertex and
+ * edge DataSets are processed in a single job and the resultant counts
+ * and checksums are merged locally.
+ *
+ * @return the checksum over the vertices and edges
+ */
@throws(classOf[Exception])
- def checksumHashCode(): ChecksumHashCode = {
- val checksum: ChecksumHashCode = self.getVertices.checksumHashCode()
- checksum.add(self.getEdges checksumHashCode())
+ def checksumHashCode(): Utils.ChecksumHashCode = {
+ val verticesId = new AbstractID().toString
+ self.getVertices.output(new Utils.ChecksumHashCodeHelper[Vertex[K,VV]](verticesId))
+
+ val edgesId = new AbstractID().toString
+ self.getEdges.output(new Utils.ChecksumHashCodeHelper[Edge[K,EV]](edgesId))
+
+ val res = self.getWrappedGraph.getContext.execute()
+
+ val checksum = res.getAccumulatorResult[Utils.ChecksumHashCode](verticesId)
+ checksum.add(res.getAccumulatorResult[Utils.ChecksumHashCode](edgesId))
checksum
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/06bf4bf3/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index e7d3a16..009d791 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -18,21 +18,36 @@
package org.apache.flink.graph.utils;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.AbstractID;
public class GraphUtils {
/**
- * Computes the checksum over the Graph
+ * Convenience method to get the count (number of elements) of a Graph
+ * as well as the checksum (sum over element hashes). The vertex and
+ * edge DataSets are processed in a single job and the resultant counts
+ * and checksums are merged locally.
*
- * @return the checksum over the vertices and edges.
+ * @param graph Graph over which to compute the count and checksum
+ * @return the checksum over the vertices and edges
*/
- public static Utils.ChecksumHashCode checksumHashCode(Graph graph) throws Exception {
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(graph.getVertices());
- checksum.add(DataSetUtils.checksumHashCode(graph.getEdges()));
+ public static <K, VV, EV> Utils.ChecksumHashCode checksumHashCode(Graph<K, VV, EV> graph) throws Exception {
+ final String verticesId = new AbstractID().toString();
+ graph.getVertices().output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId)).name("ChecksumHashCode vertices");
+
+ final String edgesId = new AbstractID().toString();
+ graph.getEdges().output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId)).name("ChecksumHashCode edges");
+
+ JobExecutionResult res = graph.getContext().execute();
+
+ Utils.ChecksumHashCode checksum = res.<Utils.ChecksumHashCode>getAccumulatorResult(verticesId);
+ checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));
+
return checksum;
}
}