You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/04/26 15:08:19 UTC

flink git commit: [FLINK-3799] [gelly] Graph checksum should execute single job

Repository: flink
Updated Branches:
  refs/heads/master a58cd80f9 -> 06bf4bf3f


[FLINK-3799] [gelly] Graph checksum should execute single job

This closes #1922


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06bf4bf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06bf4bf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06bf4bf3

Branch: refs/heads/master
Commit: 06bf4bf3f465b1c019b40dfd7c1662507d7fa2e3
Parents: a58cd80
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Apr 21 08:36:04 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Apr 26 09:07:22 2016 -0400

----------------------------------------------------------------------
 .../flink/graph/scala/utils/package.scala       | 32 +++++++++++++-------
 .../apache/flink/graph/utils/GraphUtils.java    | 29 +++++++++++++-----
 2 files changed, 43 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06bf4bf3/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
index 954e6c3..fe4ab5b 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.graph.scala
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.Utils.ChecksumHashCode
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.java.Utils
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.util.AbstractID
 
 import scala.reflect.ClassTag
 
@@ -35,16 +35,26 @@ package object utils {
   TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {
 
     /**
-      * Computes the ChecksumHashCode over the Graph.
-      *
-      * @return the ChecksumHashCode over the vertices and edges.
-      */
+     * Convenience method to get the count (number of elements) of a Graph
+     * as well as the checksum (sum over element hashes). The vertex and
+     * edge DataSets are processed in a single job and the resultant counts
+     * and checksums are merged locally.
+     *
+     * @return the checksum over the vertices and edges
+     */
     @throws(classOf[Exception])
-    def checksumHashCode(): ChecksumHashCode = {
-      val checksum: ChecksumHashCode = self.getVertices.checksumHashCode()
-      checksum.add(self.getEdges checksumHashCode())
+    def checksumHashCode(): Utils.ChecksumHashCode = {
+      val verticesId = new AbstractID().toString
+      self.getVertices.output(new Utils.ChecksumHashCodeHelper[Vertex[K,VV]](verticesId))
+
+      val edgesId = new AbstractID().toString
+      self.getEdges.output(new Utils.ChecksumHashCodeHelper[Edge[K,EV]](edgesId))
+
+      val res = self.getWrappedGraph.getContext.execute()
+
+      val checksum = res.getAccumulatorResult[Utils.ChecksumHashCode](verticesId)
+      checksum.add(res.getAccumulatorResult[Utils.ChecksumHashCode](edgesId))
       checksum
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/06bf4bf3/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index e7d3a16..009d791 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -18,21 +18,36 @@
 
 package org.apache.flink.graph.utils;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.AbstractID;
 
 public class GraphUtils {
 
 	/**
-	 * Computes the checksum over the Graph
+	 * Convenience method to get the count (number of elements) of a Graph
+	 * as well as the checksum (sum over element hashes). The vertex and
+	 * edge DataSets are processed in a single job and the resultant counts
+	 * and checksums are merged locally.
 	 *
-	 * @return the checksum over the vertices and edges.
+	 * @param graph Graph over which to compute the count and checksum
+	 * @return the checksum over the vertices and edges
 	 */
-	public static Utils.ChecksumHashCode checksumHashCode(Graph graph) throws Exception {
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(graph.getVertices());
-		checksum.add(DataSetUtils.checksumHashCode(graph.getEdges()));
+	public static <K, VV, EV> Utils.ChecksumHashCode checksumHashCode(Graph<K, VV, EV> graph) throws Exception {
+		final String verticesId = new AbstractID().toString();
+		graph.getVertices().output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId)).name("ChecksumHashCode vertices");
+
+		final String edgesId = new AbstractID().toString();
+		graph.getEdges().output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId)).name("ChecksumHashCode edges");
+
+		JobExecutionResult res = graph.getContext().execute();
+
+		Utils.ChecksumHashCode checksum = res.<Utils.ChecksumHashCode>getAccumulatorResult(verticesId);
+		checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));
+
 		return checksum;
 	}
 }