You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/02/06 15:45:35 UTC

[1/2] flink git commit: [FLINK-5693] [gelly] ChecksumHashCode DataSetAnalytic

Repository: flink
Updated Branches:
  refs/heads/master fffc8f055 -> 490162259


[FLINK-5693] [gelly] ChecksumHashCode DataSetAnalytic

Adds a DataSetAnalytic that checksums and counts elements from a
DataSet. This allows DataSetUtils.checksumHashCode to be deprecated.

This closes #3244


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7e5705c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7e5705c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7e5705c

Branch: refs/heads/master
Commit: a7e5705c1cd73b54f78348b4f6c3fc7250d7fd84
Parents: fffc8f0
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Feb 6 09:44:18 2017 -0500

----------------------------------------------------------------------
 .../graph/asm/dataset/ChecksumHashCode.java     | 172 +++++++++++++++++++
 .../graph/library/metric/ChecksumHashCode.java  |  29 ++--
 .../graph/asm/dataset/ChecksumHashCodeTest.java |  57 ++++++
 .../annotate/directed/EdgeDegreesPairTest.java  |  16 +-
 .../directed/EdgeSourceDegreesTest.java         |  16 +-
 .../directed/EdgeTargetDegreesTest.java         |  16 +-
 .../annotate/directed/VertexDegreesTest.java    |  30 ++--
 .../annotate/directed/VertexInDegreeTest.java   |  30 ++--
 .../annotate/directed/VertexOutDegreeTest.java  |  30 ++--
 .../annotate/undirected/EdgeDegreePairTest.java |  34 ++--
 .../undirected/EdgeSourceDegreeTest.java        |  34 ++--
 .../undirected/EdgeTargetDegreeTest.java        |  34 ++--
 .../annotate/undirected/VertexDegreeTest.java   |  52 +++---
 .../filter/undirected/MaximumDegreeTest.java    |   4 +-
 .../LocalClusteringCoefficientTest.java         |  11 +-
 .../directed/TriangleListingTest.java           |   8 +-
 .../LocalClusteringCoefficientTest.java         |  12 +-
 .../undirected/TriangleListingTest.java         |  12 +-
 .../graph/library/link_analysis/HITSTest.java   |  12 +-
 .../library/metric/ChecksumHashCodeTest.java    |   4 +-
 .../library/similarity/JaccardIndexTest.java    |   8 +-
 21 files changed, 459 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
new file mode 100644
index 0000000..13db7a0
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.SimpleAccumulator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.AnalyticHelper;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+
+import java.io.IOException;
+
+/**
+ * Convenience method to get the count (number of elements) of a
+ * {@link DataSet} as well as the checksum (sum over element hashes).
+ *
+ * @param <T> element type
+ */
+public class ChecksumHashCode<T>
+extends AbstractDataSetAnalytic<T, Checksum> {
+
+	private static final String CHECKSUM = "checksum";
+
+	private ChecksumHashCodeHelper<T> checksumHashCodeHelper;
+
+	@Override
+	public ChecksumHashCode<T> run(DataSet<T> input)
+			throws Exception {
+		super.run(input);
+
+		checksumHashCodeHelper = new ChecksumHashCodeHelper<>();
+
+		input
+			.output(checksumHashCodeHelper)
+				.name("ChecksumHashCode");
+
+		return this;
+	}
+
+	@Override
+	public Checksum getResult() {
+		return checksumHashCodeHelper.getAccumulator(env, CHECKSUM);
+	}
+
+	private static class ChecksumHashCodeHelper<U>
+	extends AnalyticHelper<U> {
+		private long count;
+		private long checksum;
+
+		@Override
+		public void writeRecord(U record) throws IOException {
+			count++;
+			// convert 32-bit integer to non-negative long
+			checksum += record.hashCode() & 0xffffffffL;
+		}
+
+		@Override
+		public void close() throws IOException {
+			addAccumulator(CHECKSUM, new Checksum(count, checksum));
+		}
+	}
+
+	public static class Checksum
+	implements SimpleAccumulator<Checksum> {
+		private long count;
+
+		private long checksum;
+
+		/**
+		 * Instantiate an immutable result.
+		 *
+		 * @param count count
+		 * @param checksum checksum
+		 */
+		public Checksum(long count, long checksum) {
+			this.count = count;
+			this.checksum = checksum;
+		}
+
+		/**
+		 * Get the number of elements.
+		 *
+		 * @return number of elements
+		 */
+		public long getCount() {
+			return count;
+		}
+
+		/**
+		 * Get the checksum over the hash() of elements.
+		 *
+		 * @return checksum
+		 */
+		public long getChecksum() {
+			return checksum;
+		}
+
+		@Override
+		public String toString() {
+			return String.format("ChecksumHashCode 0x%016x, count %d", this.checksum, this.count);
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(count)
+				.append(checksum)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Checksum rhs = (Checksum)obj;
+
+			return new EqualsBuilder()
+				.append(count, rhs.count)
+				.append(checksum, rhs.checksum)
+				.isEquals();
+		}
+
+		// Methods implementing SimpleAccumulator
+
+		@Override
+		public void add(Checksum value) {
+			count += value.count;
+			checksum += value.checksum;
+		}
+
+		@Override
+		public Checksum getLocalValue() {
+			return this;
+		}
+
+		@Override
+		public void resetLocal() {
+			count = 0;
+			checksum = 0;
+		}
+
+		@Override
+		public void merge(Accumulator<Checksum, Checksum> other) {
+			add(other.getLocalValue());
+		}
+
+		@Override
+		public Accumulator<Checksum, Checksum> clone() {
+			return new Checksum(count, checksum);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
index 5ba5a66..d2eeb41 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.graph.library.metric;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.Utils;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 
 /**
  * Convenience method to get the count (number of elements) of a Graph
@@ -37,35 +35,30 @@ import org.apache.flink.util.AbstractID;
  * @param <EV> edge value type
  */
 public class ChecksumHashCode<K, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Utils.ChecksumHashCode> {
+extends AbstractGraphAnalytic<K, VV, EV, Checksum> {
 
-	private String verticesId = new AbstractID().toString();
+	private org.apache.flink.graph.asm.dataset.ChecksumHashCode<Vertex<K, VV>> vertexChecksum;
 
-	private String edgesId = new AbstractID().toString();
+	private org.apache.flink.graph.asm.dataset.ChecksumHashCode<Edge<K, EV>> edgeChecksum;
 
 	@Override
 	public ChecksumHashCode<K, VV, EV> run(Graph<K, VV, EV> input)
 			throws Exception {
 		super.run(input);
 
-		input
-			.getVertices()
-			.output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId))
-				.name("ChecksumHashCode vertices");
+		vertexChecksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode<>();
+		vertexChecksum.run(input.getVertices());
 
-		input
-			.getEdges()
-			.output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId))
-				.name("ChecksumHashCode edges");
+		edgeChecksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode<>();
+		edgeChecksum.run(input.getEdges());
 
 		return this;
 	}
 
 	@Override
-	public Utils.ChecksumHashCode getResult() {
-		JobExecutionResult res = env.getLastJobExecutionResult();
-		Utils.ChecksumHashCode checksum = res.getAccumulatorResult(verticesId);
-		checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));
+	public Checksum getResult() {
+		Checksum checksum = vertexChecksum.getResult();
+		checksum.add(edgeChecksum.getResult());
 		return checksum;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
new file mode 100644
index 0000000..d25f9b6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class ChecksumHashCodeTest {
+
+	private ExecutionEnvironment env;
+
+	@Before
+	public void setup()
+			throws Exception {
+		env = ExecutionEnvironment.createCollectionsEnvironment();
+		env.getConfig().enableObjectReuse();
+	}
+
+	@Test
+	public void testChecksumHashCode()
+			throws Exception {
+		List<Long> list = Arrays.asList(ArrayUtils.toObject(
+			new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
+
+		DataSet<Long> dataset = env.fromCollection(list);
+
+		Checksum checksum = new ChecksumHashCode<Long>().run(dataset).execute();
+
+		assertEquals(list.size(), checksum.getCount());
+		assertEquals(list.size() * (list.size() - 1) / 2, checksum.getChecksum());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 485794c..0540f14 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
@@ -48,17 +48,21 @@ extends AsmTestBase {
 			"(3,4,((null),(4,2,2),(1,0,1)))\n" +
 			"(5,3,((null),(1,1,0),(4,2,2)))";
 
-		DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degrees = directedSimpleGraph
+		DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedSimpleGraph
 			.run(new EdgeDegreesPair<IntValue, NullValue, NullValue>());
 
-		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degreesPair.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()));
+		DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedRMatGraph
+			.run(new EdgeDegreesPair<LongValue, NullValue, NullValue>());
+
+		Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>>()
+			.run(degreesPair)
+			.execute();
 
 		assertEquals(12009, checksum.getCount());
 		assertEquals(0x0000176fe94702a3L, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 454e8d1..8d52889 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
@@ -48,17 +48,21 @@ extends AsmTestBase {
 			"(3,4,((null),(4,2,2)))\n" +
 			"(5,3,((null),(1,1,0)))";
 
-		DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
+		DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedSimpleGraph
 				.run(new EdgeSourceDegrees<IntValue, NullValue, NullValue>());
 
-		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(sourceDegrees.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()));
+		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedRMatGraph
+			.run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>());
+
+		Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, Degrees>>>()
+			.run(sourceDegrees)
+			.execute();
 
 		assertEquals(12009, checksum.getCount());
 		assertEquals(0x0000162435fde1d9L, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index 7add46b..eb0b892 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
@@ -48,17 +48,21 @@ extends AsmTestBase {
 			"(3,4,((null),(1,0,1)))\n" +
 			"(5,3,((null),(4,2,2)))";
 
-		DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
+		DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedSimpleGraph
 				.run(new EdgeTargetDegrees<IntValue, NullValue, NullValue>());
 
-		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()));
+		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedRMatGraph
+			.run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>());
+
+		Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, Degrees>>>()
+			.run(targetDegrees)
+			.execute();
 
 		assertEquals(12009, checksum.getCount());
 		assertEquals(0x0000160af450cc81L, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index e635bfa..06737b5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -19,10 +19,10 @@
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
@@ -38,7 +38,7 @@ extends AsmTestBase {
 	@Test
 	public void testWithSimpleDirectedGraph()
 			throws Exception {
-		DataSet<Vertex<IntValue, Degrees>> vertexDegrees = directedSimpleGraph
+		DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph
 			.run(new VertexDegrees<IntValue, NullValue, NullValue>());
 
 		String expectedResult =
@@ -49,13 +49,13 @@ extends AsmTestBase {
 			"(4,(1,0,1))\n" +
 			"(5,(1,1,0))";
 
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithSimpleUndirectedGraph()
 			throws Exception {
-		DataSet<Vertex<IntValue, Degrees>> vertexDegrees = undirectedSimpleGraph
+		DataSet<Vertex<IntValue, Degrees>> degrees = undirectedSimpleGraph
 			.run(new VertexDegrees<IntValue, NullValue, NullValue>());
 
 		String expectedResult =
@@ -66,21 +66,21 @@ extends AsmTestBase {
 			"(4,(1,1,1))\n" +
 			"(5,(1,1,1))";
 
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithEmptyGraph()
 			throws Exception {
-		DataSet<Vertex<LongValue, Degrees>> vertexDegrees;
+		DataSet<Vertex<LongValue, Degrees>> degrees;
 
-		vertexDegrees = emptyGraph
+		degrees = emptyGraph
 			.run(new VertexDegrees<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(false));
 
-		assertEquals(0, vertexDegrees.collect().size());
+		assertEquals(0, degrees.collect().size());
 
-		vertexDegrees = emptyGraph
+		degrees = emptyGraph
 			.run(new VertexDegrees<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -89,14 +89,18 @@ extends AsmTestBase {
 			"(1,(0,0,0))\n" +
 			"(2,(0,0,0))";
 
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 	throws Exception {
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new VertexDegrees<LongValue, NullValue, NullValue>()));
+		DataSet<Vertex<LongValue, Degrees>> degrees = directedRMatGraph
+			.run(new VertexDegrees<LongValue, NullValue, NullValue>());
+
+		Checksum checksum = new ChecksumHashCode<Vertex<LongValue, Degrees>>()
+			.run(degrees)
+			.execute();
 
 		assertEquals(902, checksum.getCount());
 		assertEquals(0x000001a3305dd86aL, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index c324106..95f83853 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -19,10 +19,10 @@
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -37,7 +37,7 @@ extends AsmTestBase {
 	@Test
 	public void testWithSimpleGraph()
 			throws Exception {
-		DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph
+		DataSet<Vertex<IntValue, LongValue>> inDegree = directedSimpleGraph
 			.run(new VertexInDegree<IntValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -49,21 +49,21 @@ extends AsmTestBase {
 			"(4,1)\n" +
 			"(5,0)";
 
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithEmptyGraph()
 			throws Exception {
-		DataSet<Vertex<LongValue, LongValue>> vertexDegrees;
+		DataSet<Vertex<LongValue, LongValue>> inDegree;
 
-		vertexDegrees = emptyGraph
+		inDegree = emptyGraph
 			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(false));
 
-		assertEquals(0, vertexDegrees.collect().size());
+		assertEquals(0, inDegree.collect().size());
 
-		vertexDegrees = emptyGraph
+		inDegree = emptyGraph
 			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -72,17 +72,21 @@ extends AsmTestBase {
 			"(1,0)\n" +
 			"(2,0)";
 
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode inDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		DataSet<Vertex<LongValue, LongValue>> inDegree = directedRMatGraph
 			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
-				.setIncludeZeroDegreeVertices(true)));
+				.setIncludeZeroDegreeVertices(true));
+
+		Checksum checksum = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
+			.run(inDegree)
+			.execute();
 
-		assertEquals(902, inDegreeChecksum.getCount());
-		assertEquals(0x0000000000e1d885L, inDegreeChecksum.getChecksum());
+		assertEquals(902, checksum.getCount());
+		assertEquals(0x0000000000e1d885L, checksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index 8b5e981..7da3d8d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -19,10 +19,10 @@
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -37,7 +37,7 @@ extends AsmTestBase {
 	@Test
 	public void testWithSimpleGraph()
 			throws Exception {
-		DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph
+		DataSet<Vertex<IntValue, LongValue>> outDegree = directedSimpleGraph
 			.run(new VertexOutDegree<IntValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -49,21 +49,21 @@ extends AsmTestBase {
 			"(4,0)\n" +
 			"(5,1)";
 
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithEmptyGraph()
 			throws Exception {
-		DataSet<Vertex<LongValue, LongValue>> vertexDegrees;
+		DataSet<Vertex<LongValue, LongValue>> outDegree;
 
-		vertexDegrees = emptyGraph
+		outDegree = emptyGraph
 			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(false));
 
-		assertEquals(0, vertexDegrees.collect().size());
+		assertEquals(0, outDegree.collect().size());
 
-		vertexDegrees = emptyGraph
+		outDegree = emptyGraph
 			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -72,17 +72,21 @@ extends AsmTestBase {
 			"(1,0)\n" +
 			"(2,0)";
 
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode outDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		DataSet<Vertex<LongValue, LongValue>> outDegree = directedRMatGraph
 			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
-				.setIncludeZeroDegreeVertices(true)));
+				.setIncludeZeroDegreeVertices(true));
+
+		Checksum checksum = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
+			.run(outDegree)
+			.execute();
 
-		assertEquals(902, outDegreeChecksum.getCount());
-		assertEquals(0x0000000000e1d885L, outDegreeChecksum.getChecksum());
+		assertEquals(902, checksum.getCount());
+		assertEquals(0x0000000000e1d885L, checksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
index 3a8636a..476b3fe 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -54,31 +54,39 @@ extends AsmTestBase {
 			"(4,3,((null),1,4))\n" +
 			"(5,3,((null),1,4))";
 
-		DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> sourceDegree = undirectedSimpleGraph
+		DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedSimpleGraph
 			.run(new EdgeDegreePair<IntValue, NullValue, NullValue>());
 
-		TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), expectedResult);
 
-		DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> targetDegree = undirectedSimpleGraph
+		DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = undirectedSimpleGraph
 			.run(new EdgeDegreePair<IntValue, NullValue, NullValue>()
 				.setReduceOnTargetId(true));
 
-		TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degreePairOnTargetId.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
-			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>()));
+		DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedRMatGraph
+			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>());
 
-		assertEquals(20884, sourceDegreeChecksum.getCount());
-		assertEquals(0x00000001e051efe4L, sourceDegreeChecksum.getChecksum());
+		Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
+			.run(degreePairOnSourceId)
+			.execute();
 
-		ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
+		assertEquals(20884, checksumOnSourceId.getCount());
+		assertEquals(0x00000001e051efe4L, checksumOnSourceId.getChecksum());
+
+		DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = undirectedRMatGraph
 			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
-				.setReduceOnTargetId(true)));
+				.setReduceOnTargetId(true));
+
+		Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
+			.run(degreePairOnTargetId)
+			.execute();
 
-		assertEquals(sourceDegreeChecksum, targetDegreeChecksum);
+		assertEquals(checksumOnSourceId, checksumOnTargetId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
index 9671461..0dc2178 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -54,31 +54,39 @@ extends AsmTestBase {
 			"(4,3,((null),1))\n" +
 			"(5,3,((null),1))";
 
-		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegree = undirectedSimpleGraph
+		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedSimpleGraph
 			.run(new EdgeSourceDegree<IntValue, NullValue, NullValue>());
 
-		TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), expectedResult);
 
-		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegree = undirectedSimpleGraph
+		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = undirectedSimpleGraph
 			.run(new EdgeSourceDegree<IntValue, NullValue, NullValue>()
 				.setReduceOnTargetId(true));
 
-		TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(sourceDegreeOnTargetId.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
-			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()));
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedRMatGraph
+			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>());
 
-		assertEquals(20884, sourceDegreeChecksum.getCount());
-		assertEquals(0x000000019d8f0070L, sourceDegreeChecksum.getChecksum());
+		Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
+			.run(sourceDegreeOnSourceId)
+			.execute();
 
-		ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
+		assertEquals(20884, checksumOnSourceId.getCount());
+		assertEquals(0x000000019d8f0070L, checksumOnSourceId.getChecksum());
+
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = undirectedRMatGraph
 			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
-				.setReduceOnTargetId(true)));
+				.setReduceOnTargetId(true));
+
+		Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
+			.run(sourceDegreeOnTargetId)
+			.execute();
 
-		assertEquals(sourceDegreeChecksum, targetDegreeChecksum);
+		assertEquals(checksumOnTargetId, checksumOnTargetId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
index 54f2063..b14ddc0 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -54,31 +54,39 @@ extends AsmTestBase {
 			"(4,3,((null),4))\n" +
 			"(5,3,((null),4))";
 
-		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegree = undirectedSimpleGraph
+		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedSimpleGraph
 				.run(new EdgeTargetDegree<IntValue, NullValue, NullValue>());
 
-		TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), expectedResult);
 
-		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegree = undirectedSimpleGraph
+		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = undirectedSimpleGraph
 			.run(new EdgeTargetDegree<IntValue, NullValue, NullValue>()
 				.setReduceOnSourceId(true));
 
-		TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(targetDegreeOnSourceId.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
-			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()));
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedRMatGraph
+			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>());
 
-		assertEquals(20884, sourceDegreeChecksum.getCount());
-		assertEquals(0x000000019d8f0070L, sourceDegreeChecksum.getChecksum());
+		Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
+			.run(targetDegreeOnTargetId)
+			.execute();
 
-		ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
+		assertEquals(20884, checksumOnTargetId.getCount());
+		assertEquals(0x000000019d8f0070L, checksumOnTargetId.getChecksum());
+
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = undirectedRMatGraph
 			.run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
-				.setReduceOnSourceId(true)));
+				.setReduceOnSourceId(true));
+
+		Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
+			.run(targetDegreeOnSourceId)
+			.execute();
 
-		assertEquals(sourceDegreeChecksum, targetDegreeChecksum);
+		assertEquals(checksumOnTargetId, checksumOnSourceId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
index 37e1bb9..102beae 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
@@ -19,10 +19,10 @@
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -45,16 +45,16 @@ extends AsmTestBase {
 			"(4,1)\n" +
 			"(5,1)";
 
-		DataSet<Vertex<IntValue, LongValue>> sourceDegrees = undirectedSimpleGraph
+		DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph
 			.run(new VertexDegree<IntValue, NullValue, NullValue>());
 
-		TestBaseUtils.compareResultAsText(sourceDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult);
 
-		DataSet<Vertex<IntValue, LongValue>> targetDegrees = undirectedSimpleGraph
+		DataSet<Vertex<IntValue, LongValue>> degreeOnTargetId = undirectedSimpleGraph
 			.run(new VertexDegree<IntValue, NullValue, NullValue>()
 				.setReduceOnTargetId(true));
 
-		TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degreeOnTargetId.collect(), expectedResult);
 	}
 
 	@Test
@@ -62,18 +62,18 @@ extends AsmTestBase {
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;
 
-		DataSet<Vertex<LongValue, LongValue>> sourceDegrees = completeGraph
+		DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = completeGraph
 			.run(new VertexDegree<LongValue, NullValue, NullValue>());
 
-		for (Vertex<LongValue, LongValue> vertex : sourceDegrees.collect()) {
+		for (Vertex<LongValue, LongValue> vertex : degreeOnSourceId.collect()) {
 			assertEquals(expectedDegree, vertex.getValue().getValue());
 		}
 
-		DataSet<Vertex<LongValue, LongValue>> targetDegrees = completeGraph
+		DataSet<Vertex<LongValue, LongValue>> degreeOnTargetId = completeGraph
 			.run(new VertexDegree<LongValue, NullValue, NullValue>()
 				.setReduceOnTargetId(true));
 
-		for (Vertex<LongValue, LongValue> vertex : targetDegrees.collect()) {
+		for (Vertex<LongValue, LongValue> vertex : degreeOnTargetId.collect()) {
 			assertEquals(expectedDegree, vertex.getValue().getValue());
 		}
 	}
@@ -81,15 +81,15 @@ extends AsmTestBase {
 	@Test
 	public void testWithEmptyGraph()
 			throws Exception {
-		DataSet<Vertex<LongValue, LongValue>> vertexDegrees;
+		DataSet<Vertex<LongValue, LongValue>> degree;
 
-		vertexDegrees = emptyGraph
+		degree = emptyGraph
 			.run(new VertexDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(false));
 
-		assertEquals(0, vertexDegrees.collect().size());
+		assertEquals(0, degree.collect().size());
 
-		vertexDegrees = emptyGraph
+		degree = emptyGraph
 			.run(new VertexDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -98,22 +98,30 @@ extends AsmTestBase {
 			"(1,0)\n" +
 			"(2,0)";
 
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degree.collect(), expectedResult);
 	}
 
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
-			.run(new VertexDegree<LongValue, NullValue, NullValue>()));
+		DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = undirectedRMatGraph
+			.run(new VertexDegree<LongValue, NullValue, NullValue>());
+
+		Checksum checksumOnSourceId = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
+			.run(degreeOnSourceId)
+			.execute();
 
-		assertEquals(902, sourceDegreeChecksum.getCount());
-		assertEquals(0x0000000000e1fb30L, sourceDegreeChecksum.getChecksum());
+		assertEquals(902, checksumOnSourceId.getCount());
+		assertEquals(0x0000000000e1fb30L, checksumOnSourceId.getChecksum());
 
-		ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
+		DataSet<Vertex<LongValue, LongValue>> degreeOnTargetId = undirectedRMatGraph
 			.run(new VertexDegree<LongValue, NullValue, NullValue>()
-				.setReduceOnTargetId(true)));
+				.setReduceOnTargetId(true));
+
+		Checksum checksumOnTargetId = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
+			.run(degreeOnTargetId)
+			.execute();
 
-		assertEquals(sourceDegreeChecksum, targetDegreeChecksum);
+		assertEquals(checksumOnSourceId, checksumOnTargetId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
index ca96f24..55f7743 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.graph.asm.degree.filter.undirected;
 
-import org.apache.flink.api.java.Utils;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.metric.ChecksumHashCode;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
@@ -62,7 +62,7 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		Utils.ChecksumHashCode checksum = undirectedRMatGraph
+		Checksum checksum = undirectedRMatGraph
 			.run(new MaximumDegree<LongValue, NullValue, NullValue>(16))
 			.run(new ChecksumHashCode<LongValue, NullValue, NullValue>())
 			.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
index d8d93ad..77d9dba 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
@@ -20,9 +20,8 @@ package org.apache.flink.graph.library.clustering.directed;
 
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
@@ -76,8 +75,12 @@ extends AsmTestBase {
 	@Test
 	public void testRMatGraph()
 			throws Exception {
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()));
+		DataSet<Result<LongValue>> cc = directedRMatGraph
+			.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
+
+		Checksum checksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode<Result<LongValue>>()
+			.run(cc)
+			.execute();
 
 		assertEquals(902, checksum.getCount());
 		assertEquals(0x000001bf83866775L, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
index 3c86358..6ae9b90 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.graph.library.clustering.directed;
 
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
@@ -76,7 +76,9 @@ extends AsmTestBase {
 			.run(new TriangleListing<LongValue, NullValue, NullValue>()
 				.setSortTriangleVertices(true));
 
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
+		Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
+			.run(tl)
+			.execute();
 
 		assertEquals(75049, checksum.getCount());
 		assertEquals(0x00000033111f11baL, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
index f5416fb..ba0834c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
@@ -76,8 +76,12 @@ extends AsmTestBase {
 	@Test
 	public void testRMatGraph()
 			throws Exception {
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
-			.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()));
+		DataSet<Result<LongValue>> cc = undirectedRMatGraph
+			.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
+
+		Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
+			.run(cc)
+			.execute();
 
 		assertEquals(902, checksum.getCount());
 		assertEquals(0x000001cab2d3677bL, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
index 0d1ebd0..bc3914e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -58,7 +58,9 @@ extends AsmTestBase {
 		DataSet<Tuple3<LongValue, LongValue, LongValue>> tl = completeGraph
 			.run(new TriangleListing<LongValue, NullValue, NullValue>());
 
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
+		Checksum checksum = new ChecksumHashCode<Tuple3<LongValue, LongValue, LongValue>>()
+			.run(tl)
+			.execute();
 
 		assertEquals(expectedCount, checksum.getCount());
 	}
@@ -70,7 +72,9 @@ extends AsmTestBase {
 			.run(new TriangleListing<LongValue, NullValue, NullValue>()
 				.setSortTriangleVertices(true));
 
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
+		Checksum checksum = new ChecksumHashCode<Tuple3<LongValue, LongValue, LongValue>>()
+			.run(tl)
+			.execute();
 
 		assertEquals(75049, checksum.getCount());
 		assertEquals(0x00000001a5b500afL, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index 679bf4f..2e5cebe 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -19,10 +19,10 @@
 package org.apache.flink.graph.library.link_analysis;
 
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.link_analysis.HITS.Result;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -79,8 +79,12 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new HITS<LongValue, NullValue, NullValue>(0.000001)));
+		DataSet<Result<LongValue>> hits = directedRMatGraph
+			.run(new HITS<LongValue, NullValue, NullValue>(0.000001));
+
+		Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
+			.run(hits)
+			.execute();
 
 		assertEquals(902, checksum.getCount());
 		assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
index 73545fb..24f0c2d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.graph.library.metric;
 
-import org.apache.flink.api.java.Utils;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ extends AsmTestBase {
 			TestGraphUtils.getLongLongEdgeData(env),
 			env);
 
-		Utils.ChecksumHashCode checksum = graph
+		Checksum checksum = graph
 			.run(new ChecksumHashCode<Long, Long, Long>())
 			.execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 58c986d..9490459 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.graph.library.similarity;
 
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -129,7 +129,9 @@ extends AsmTestBase {
 			.run(new JaccardIndex<LongValue, NullValue, NullValue>()
 				.setGroupSize(4));
 
-		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(ji);
+		Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
+			.run(ji)
+			.execute();
 
 		assertEquals(13954, checksum.getCount());
 		assertEquals(0x00001b1a1f7a9d0bL, checksum.getChecksum());


[2/2] flink git commit: [FLINK-5694] [gelly] Collect DataSetAnalytic

Posted by gr...@apache.org.
[FLINK-5694] [gelly] Collect DataSetAnalytic

Adds a DataSetAnalytic that accumulates elements and returns elements
using a List. This mirrors the implementation of DataSet.collect() but
using the analytic execution workflow.

This closes #3245


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49016225
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49016225
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49016225

Branch: refs/heads/master
Commit: 4901622594af7b727baf08be6ee82803e7c4e645
Parents: a7e5705
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Jan 31 12:20:14 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Feb 6 09:44:41 2017 -0500

----------------------------------------------------------------------
 .../apache/flink/graph/asm/dataset/Collect.java | 103 +++++++++++++++++++
 .../flink/graph/asm/dataset/CollectTest.java    |  55 ++++++++++
 2 files changed, 158 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49016225/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
new file mode 100644
index 0000000..4398296
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.AnalyticHelper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Collect the elements of a {@link DataSet} into a {@link List}.
+ *
+ * @param <T> element type
+ */
+public class Collect<T>
+extends AbstractDataSetAnalytic<T, List<T>> {
+
+	private static final String COLLECT = "collect";
+
+	private CollectHelper<T> collectHelper;
+
+	private TypeSerializer<T> serializer;
+
+	@Override
+	public Collect<T> run(DataSet<T> input)
+			throws Exception {
+		super.run(input);
+
+		ExecutionEnvironment env = input.getExecutionEnvironment();
+		serializer = input.getType().createSerializer(env.getConfig());
+
+		collectHelper = new CollectHelper<>(serializer);
+
+		input
+			.output(collectHelper)
+				.name("Collect");
+
+		return this;
+	}
+
+	@Override
+	public List<T> getResult() {
+		ArrayList<byte[]> accResult = collectHelper.getAccumulator(env, COLLECT);
+		if (accResult != null) {
+			try {
+				return SerializedListAccumulator.deserializeList(accResult, serializer);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Cannot find type class of collected data type", e);
+			} catch (IOException e) {
+				throw new RuntimeException("Serialization error while deserializing collected data", e);
+			}
+		} else {
+			throw new RuntimeException("Unable to retrieve the DataSet");
+		}
+	}
+
+	private static class CollectHelper<U>
+	extends AnalyticHelper<U> {
+		private SerializedListAccumulator<U> accumulator;
+
+		private final TypeSerializer<U> serializer;
+
+		public CollectHelper(TypeSerializer<U> serializer) {
+			this.serializer = serializer;
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks)  {
+			this.accumulator = new SerializedListAccumulator<>();
+		}
+
+		@Override
+		public void writeRecord(U record) throws IOException {
+			accumulator.add(record, serializer);
+		}
+
+		@Override
+		public void close() throws IOException {
+			addAccumulator(COLLECT, accumulator);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49016225/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
new file mode 100644
index 0000000..ec1af42
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class CollectTest {
+
+	private ExecutionEnvironment env;
+
+	@Before
+	public void setup()
+			throws Exception {
+		env = ExecutionEnvironment.createCollectionsEnvironment();
+		env.getConfig().enableObjectReuse();
+	}
+
+	@Test
+	public void testCollect()
+			throws Exception {
+		List<Long> list = Arrays.asList(ArrayUtils.toObject(
+			new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
+
+		DataSet<Long> dataset = env.fromCollection(list);
+
+		List<Long> collected = new Collect<Long>().run(dataset).execute();
+
+		assertArrayEquals(list.toArray(), collected.toArray());
+	}
+}