You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/02/06 15:45:35 UTC
[1/2] flink git commit: [FLINK-5693] [gelly] ChecksumHashCode
DataSetAnalytic
Repository: flink
Updated Branches:
refs/heads/master fffc8f055 -> 490162259
[FLINK-5693] [gelly] ChecksumHashCode DataSetAnalytic
Adds a DataSetAnalytic that checksums and counts elements from a
DataSet. This allows DataSetUtils.checksumHashCode to be deprecated.
This closes #3244
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7e5705c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7e5705c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7e5705c
Branch: refs/heads/master
Commit: a7e5705c1cd73b54f78348b4f6c3fc7250d7fd84
Parents: fffc8f0
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Feb 6 09:44:18 2017 -0500
----------------------------------------------------------------------
.../graph/asm/dataset/ChecksumHashCode.java | 172 +++++++++++++++++++
.../graph/library/metric/ChecksumHashCode.java | 29 ++--
.../graph/asm/dataset/ChecksumHashCodeTest.java | 57 ++++++
.../annotate/directed/EdgeDegreesPairTest.java | 16 +-
.../directed/EdgeSourceDegreesTest.java | 16 +-
.../directed/EdgeTargetDegreesTest.java | 16 +-
.../annotate/directed/VertexDegreesTest.java | 30 ++--
.../annotate/directed/VertexInDegreeTest.java | 30 ++--
.../annotate/directed/VertexOutDegreeTest.java | 30 ++--
.../annotate/undirected/EdgeDegreePairTest.java | 34 ++--
.../undirected/EdgeSourceDegreeTest.java | 34 ++--
.../undirected/EdgeTargetDegreeTest.java | 34 ++--
.../annotate/undirected/VertexDegreeTest.java | 52 +++---
.../filter/undirected/MaximumDegreeTest.java | 4 +-
.../LocalClusteringCoefficientTest.java | 11 +-
.../directed/TriangleListingTest.java | 8 +-
.../LocalClusteringCoefficientTest.java | 12 +-
.../undirected/TriangleListingTest.java | 12 +-
.../graph/library/link_analysis/HITSTest.java | 12 +-
.../library/metric/ChecksumHashCodeTest.java | 4 +-
.../library/similarity/JaccardIndexTest.java | 8 +-
21 files changed, 459 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
new file mode 100644
index 0000000..13db7a0
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.SimpleAccumulator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.AnalyticHelper;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+
+import java.io.IOException;
+
+/**
+ * Convenience method to get the count (number of elements) of a
+ * {@link DataSet} as well as the checksum (sum over element hashes).
+ *
+ * @param <T> element type
+ */
+public class ChecksumHashCode<T>
+extends AbstractDataSetAnalytic<T, Checksum> {
+
+ private static final String CHECKSUM = "checksum";
+
+ private ChecksumHashCodeHelper<T> checksumHashCodeHelper;
+
+ @Override
+ public ChecksumHashCode<T> run(DataSet<T> input)
+ throws Exception {
+ super.run(input);
+
+ checksumHashCodeHelper = new ChecksumHashCodeHelper<>();
+
+ input
+ .output(checksumHashCodeHelper)
+ .name("ChecksumHashCode");
+
+ return this;
+ }
+
+ @Override
+ public Checksum getResult() {
+ return checksumHashCodeHelper.getAccumulator(env, CHECKSUM);
+ }
+
+ private static class ChecksumHashCodeHelper<U>
+ extends AnalyticHelper<U> {
+ private long count;
+ private long checksum;
+
+ @Override
+ public void writeRecord(U record) throws IOException {
+ count++;
+ // convert 32-bit integer to non-negative long
+ checksum += record.hashCode() & 0xffffffffL;
+ }
+
+ @Override
+ public void close() throws IOException {
+ addAccumulator(CHECKSUM, new Checksum(count, checksum));
+ }
+ }
+
+ public static class Checksum
+ implements SimpleAccumulator<Checksum> {
+ private long count;
+
+ private long checksum;
+
+ /**
+ * Instantiate an immutable result.
+ *
+ * @param count count
+ * @param checksum checksum
+ */
+ public Checksum(long count, long checksum) {
+ this.count = count;
+ this.checksum = checksum;
+ }
+
+ /**
+ * Get the number of elements.
+ *
+ * @return number of elements
+ */
+ public long getCount() {
+ return count;
+ }
+
+ /**
+ * Get the checksum over the hash() of elements.
+ *
+ * @return checksum
+ */
+ public long getChecksum() {
+ return checksum;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ChecksumHashCode 0x%016x, count %d", this.checksum, this.count);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(count)
+ .append(checksum)
+ .hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) { return false; }
+ if (obj == this) { return true; }
+ if (obj.getClass() != getClass()) { return false; }
+
+ Checksum rhs = (Checksum)obj;
+
+ return new EqualsBuilder()
+ .append(count, rhs.count)
+ .append(checksum, rhs.checksum)
+ .isEquals();
+ }
+
+ // Methods implementing SimpleAccumulator
+
+ @Override
+ public void add(Checksum value) {
+ count += value.count;
+ checksum += value.checksum;
+ }
+
+ @Override
+ public Checksum getLocalValue() {
+ return this;
+ }
+
+ @Override
+ public void resetLocal() {
+ count = 0;
+ checksum = 0;
+ }
+
+ @Override
+ public void merge(Accumulator<Checksum, Checksum> other) {
+ add(other.getLocalValue());
+ }
+
+ @Override
+ public Accumulator<Checksum, Checksum> clone() {
+ return new Checksum(count, checksum);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
index 5ba5a66..d2eeb41 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
@@ -18,13 +18,11 @@
package org.apache.flink.graph.library.metric;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.Utils;
import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
/**
* Convenience method to get the count (number of elements) of a Graph
@@ -37,35 +35,30 @@ import org.apache.flink.util.AbstractID;
* @param <EV> edge value type
*/
public class ChecksumHashCode<K, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Utils.ChecksumHashCode> {
+extends AbstractGraphAnalytic<K, VV, EV, Checksum> {
- private String verticesId = new AbstractID().toString();
+ private org.apache.flink.graph.asm.dataset.ChecksumHashCode<Vertex<K, VV>> vertexChecksum;
- private String edgesId = new AbstractID().toString();
+ private org.apache.flink.graph.asm.dataset.ChecksumHashCode<Edge<K, EV>> edgeChecksum;
@Override
public ChecksumHashCode<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
- input
- .getVertices()
- .output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId))
- .name("ChecksumHashCode vertices");
+ vertexChecksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode<>();
+ vertexChecksum.run(input.getVertices());
- input
- .getEdges()
- .output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId))
- .name("ChecksumHashCode edges");
+ edgeChecksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode<>();
+ edgeChecksum.run(input.getEdges());
return this;
}
@Override
- public Utils.ChecksumHashCode getResult() {
- JobExecutionResult res = env.getLastJobExecutionResult();
- Utils.ChecksumHashCode checksum = res.getAccumulatorResult(verticesId);
- checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));
+ public Checksum getResult() {
+ Checksum checksum = vertexChecksum.getResult();
+ checksum.add(edgeChecksum.getResult());
return checksum;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
new file mode 100644
index 0000000..d25f9b6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class ChecksumHashCodeTest {
+
+ private ExecutionEnvironment env;
+
+ @Before
+ public void setup()
+ throws Exception {
+ env = ExecutionEnvironment.createCollectionsEnvironment();
+ env.getConfig().enableObjectReuse();
+ }
+
+ @Test
+ public void testChecksumHashCode()
+ throws Exception {
+ List<Long> list = Arrays.asList(ArrayUtils.toObject(
+ new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
+
+ DataSet<Long> dataset = env.fromCollection(list);
+
+ Checksum checksum = new ChecksumHashCode<Long>().run(dataset).execute();
+
+ assertEquals(list.size(), checksum.getCount());
+ assertEquals(list.size() * (list.size() - 1) / 2, checksum.getChecksum());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 485794c..0540f14 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -19,11 +19,11 @@
package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
@@ -48,17 +48,21 @@ extends AsmTestBase {
"(3,4,((null),(4,2,2),(1,0,1)))\n" +
"(5,3,((null),(1,1,0),(4,2,2)))";
- DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degrees = directedSimpleGraph
+ DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedSimpleGraph
.run(new EdgeDegreesPair<IntValue, NullValue, NullValue>());
- TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degreesPair.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()));
+ DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedRMatGraph
+ .run(new EdgeDegreesPair<LongValue, NullValue, NullValue>());
+
+ Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>>()
+ .run(degreesPair)
+ .execute();
assertEquals(12009, checksum.getCount());
assertEquals(0x0000176fe94702a3L, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 454e8d1..8d52889 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -19,11 +19,11 @@
package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
@@ -48,17 +48,21 @@ extends AsmTestBase {
"(3,4,((null),(4,2,2)))\n" +
"(5,3,((null),(1,1,0)))";
- DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
+ DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedSimpleGraph
.run(new EdgeSourceDegrees<IntValue, NullValue, NullValue>());
- TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(sourceDegrees.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()));
+ DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedRMatGraph
+ .run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>());
+
+ Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, Degrees>>>()
+ .run(sourceDegrees)
+ .execute();
assertEquals(12009, checksum.getCount());
assertEquals(0x0000162435fde1d9L, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index 7add46b..eb0b892 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -19,11 +19,11 @@
package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
@@ -48,17 +48,21 @@ extends AsmTestBase {
"(3,4,((null),(1,0,1)))\n" +
"(5,3,((null),(4,2,2)))";
- DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
+ DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedSimpleGraph
.run(new EdgeTargetDegrees<IntValue, NullValue, NullValue>());
- TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()));
+ DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedRMatGraph
+ .run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>());
+
+ Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, Degrees>>>()
+ .run(targetDegrees)
+ .execute();
assertEquals(12009, checksum.getCount());
assertEquals(0x0000160af450cc81L, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index e635bfa..06737b5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -19,10 +19,10 @@
package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
@@ -38,7 +38,7 @@ extends AsmTestBase {
@Test
public void testWithSimpleDirectedGraph()
throws Exception {
- DataSet<Vertex<IntValue, Degrees>> vertexDegrees = directedSimpleGraph
+ DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph
.run(new VertexDegrees<IntValue, NullValue, NullValue>());
String expectedResult =
@@ -49,13 +49,13 @@ extends AsmTestBase {
"(4,(1,0,1))\n" +
"(5,(1,1,0))";
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
}
@Test
public void testWithSimpleUndirectedGraph()
throws Exception {
- DataSet<Vertex<IntValue, Degrees>> vertexDegrees = undirectedSimpleGraph
+ DataSet<Vertex<IntValue, Degrees>> degrees = undirectedSimpleGraph
.run(new VertexDegrees<IntValue, NullValue, NullValue>());
String expectedResult =
@@ -66,21 +66,21 @@ extends AsmTestBase {
"(4,(1,1,1))\n" +
"(5,(1,1,1))";
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
}
@Test
public void testWithEmptyGraph()
throws Exception {
- DataSet<Vertex<LongValue, Degrees>> vertexDegrees;
+ DataSet<Vertex<LongValue, Degrees>> degrees;
- vertexDegrees = emptyGraph
+ degrees = emptyGraph
.run(new VertexDegrees<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
- assertEquals(0, vertexDegrees.collect().size());
+ assertEquals(0, degrees.collect().size());
- vertexDegrees = emptyGraph
+ degrees = emptyGraph
.run(new VertexDegrees<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -89,14 +89,18 @@ extends AsmTestBase {
"(1,(0,0,0))\n" +
"(2,(0,0,0))";
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new VertexDegrees<LongValue, NullValue, NullValue>()));
+ DataSet<Vertex<LongValue, Degrees>> degrees = directedRMatGraph
+ .run(new VertexDegrees<LongValue, NullValue, NullValue>());
+
+ Checksum checksum = new ChecksumHashCode<Vertex<LongValue, Degrees>>()
+ .run(degrees)
+ .execute();
assertEquals(902, checksum.getCount());
assertEquals(0x000001a3305dd86aL, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index c324106..95f83853 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -19,10 +19,10 @@
package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -37,7 +37,7 @@ extends AsmTestBase {
@Test
public void testWithSimpleGraph()
throws Exception {
- DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph
+ DataSet<Vertex<IntValue, LongValue>> inDegree = directedSimpleGraph
.run(new VertexInDegree<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -49,21 +49,21 @@ extends AsmTestBase {
"(4,1)\n" +
"(5,0)";
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult);
}
@Test
public void testWithEmptyGraph()
throws Exception {
- DataSet<Vertex<LongValue, LongValue>> vertexDegrees;
+ DataSet<Vertex<LongValue, LongValue>> inDegree;
- vertexDegrees = emptyGraph
+ inDegree = emptyGraph
.run(new VertexInDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
- assertEquals(0, vertexDegrees.collect().size());
+ assertEquals(0, inDegree.collect().size());
- vertexDegrees = emptyGraph
+ inDegree = emptyGraph
.run(new VertexInDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -72,17 +72,21 @@ extends AsmTestBase {
"(1,0)\n" +
"(2,0)";
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode inDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ DataSet<Vertex<LongValue, LongValue>> inDegree = directedRMatGraph
.run(new VertexInDegree<LongValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(true)));
+ .setIncludeZeroDegreeVertices(true));
+
+ Checksum checksum = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
+ .run(inDegree)
+ .execute();
- assertEquals(902, inDegreeChecksum.getCount());
- assertEquals(0x0000000000e1d885L, inDegreeChecksum.getChecksum());
+ assertEquals(902, checksum.getCount());
+ assertEquals(0x0000000000e1d885L, checksum.getChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index 8b5e981..7da3d8d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -19,10 +19,10 @@
package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -37,7 +37,7 @@ extends AsmTestBase {
@Test
public void testWithSimpleGraph()
throws Exception {
- DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph
+ DataSet<Vertex<IntValue, LongValue>> outDegree = directedSimpleGraph
.run(new VertexOutDegree<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -49,21 +49,21 @@ extends AsmTestBase {
"(4,0)\n" +
"(5,1)";
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult);
}
@Test
public void testWithEmptyGraph()
throws Exception {
- DataSet<Vertex<LongValue, LongValue>> vertexDegrees;
+ DataSet<Vertex<LongValue, LongValue>> outDegree;
- vertexDegrees = emptyGraph
+ outDegree = emptyGraph
.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
- assertEquals(0, vertexDegrees.collect().size());
+ assertEquals(0, outDegree.collect().size());
- vertexDegrees = emptyGraph
+ outDegree = emptyGraph
.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -72,17 +72,21 @@ extends AsmTestBase {
"(1,0)\n" +
"(2,0)";
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode outDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ DataSet<Vertex<LongValue, LongValue>> outDegree = directedRMatGraph
.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(true)));
+ .setIncludeZeroDegreeVertices(true));
+
+ Checksum checksum = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
+ .run(outDegree)
+ .execute();
- assertEquals(902, outDegreeChecksum.getCount());
- assertEquals(0x0000000000e1d885L, outDegreeChecksum.getChecksum());
+ assertEquals(902, checksum.getCount());
+ assertEquals(0x0000000000e1d885L, checksum.getChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
index 3a8636a..476b3fe 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
@@ -19,11 +19,11 @@
package org.apache.flink.graph.asm.degree.annotate.undirected;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -54,31 +54,39 @@ extends AsmTestBase {
"(4,3,((null),1,4))\n" +
"(5,3,((null),1,4))";
- DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> sourceDegree = undirectedSimpleGraph
+ DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedSimpleGraph
.run(new EdgeDegreePair<IntValue, NullValue, NullValue>());
- TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), expectedResult);
- DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> targetDegree = undirectedSimpleGraph
+ DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = undirectedSimpleGraph
.run(new EdgeDegreePair<IntValue, NullValue, NullValue>()
.setReduceOnTargetId(true));
- TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degreePairOnTargetId.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
- .run(new EdgeDegreePair<LongValue, NullValue, NullValue>()));
+ DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedRMatGraph
+ .run(new EdgeDegreePair<LongValue, NullValue, NullValue>());
- assertEquals(20884, sourceDegreeChecksum.getCount());
- assertEquals(0x00000001e051efe4L, sourceDegreeChecksum.getChecksum());
+ Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
+ .run(degreePairOnSourceId)
+ .execute();
- ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
+ assertEquals(20884, checksumOnSourceId.getCount());
+ assertEquals(0x00000001e051efe4L, checksumOnSourceId.getChecksum());
+
+ DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = undirectedRMatGraph
.run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
- .setReduceOnTargetId(true)));
+ .setReduceOnTargetId(true));
+
+ Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
+ .run(degreePairOnTargetId)
+ .execute();
- assertEquals(sourceDegreeChecksum, targetDegreeChecksum);
+ assertEquals(checksumOnSourceId, checksumOnTargetId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
index 9671461..0dc2178 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
@@ -19,11 +19,11 @@
package org.apache.flink.graph.asm.degree.annotate.undirected;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -54,31 +54,39 @@ extends AsmTestBase {
"(4,3,((null),1))\n" +
"(5,3,((null),1))";
- DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegree = undirectedSimpleGraph
+ DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedSimpleGraph
.run(new EdgeSourceDegree<IntValue, NullValue, NullValue>());
- TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), expectedResult);
- DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegree = undirectedSimpleGraph
+ DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = undirectedSimpleGraph
.run(new EdgeSourceDegree<IntValue, NullValue, NullValue>()
.setReduceOnTargetId(true));
- TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(sourceDegreeOnTargetId.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
- .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()));
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedRMatGraph
+ .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>());
- assertEquals(20884, sourceDegreeChecksum.getCount());
- assertEquals(0x000000019d8f0070L, sourceDegreeChecksum.getChecksum());
+ Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
+ .run(sourceDegreeOnSourceId)
+ .execute();
- ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
+ assertEquals(20884, checksumOnSourceId.getCount());
+ assertEquals(0x000000019d8f0070L, checksumOnSourceId.getChecksum());
+
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = undirectedRMatGraph
.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
- .setReduceOnTargetId(true)));
+ .setReduceOnTargetId(true));
+
+ Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
+ .run(sourceDegreeOnTargetId)
+ .execute();
- assertEquals(sourceDegreeChecksum, targetDegreeChecksum);
+ assertEquals(checksumOnTargetId, checksumOnTargetId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
index 54f2063..b14ddc0 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
@@ -19,11 +19,11 @@
package org.apache.flink.graph.asm.degree.annotate.undirected;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -54,31 +54,39 @@ extends AsmTestBase {
"(4,3,((null),4))\n" +
"(5,3,((null),4))";
- DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegree = undirectedSimpleGraph
+ DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedSimpleGraph
.run(new EdgeTargetDegree<IntValue, NullValue, NullValue>());
- TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), expectedResult);
- DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegree = undirectedSimpleGraph
+ DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = undirectedSimpleGraph
.run(new EdgeTargetDegree<IntValue, NullValue, NullValue>()
.setReduceOnSourceId(true));
- TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(targetDegreeOnSourceId.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
- .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()));
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedRMatGraph
+ .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>());
- assertEquals(20884, sourceDegreeChecksum.getCount());
- assertEquals(0x000000019d8f0070L, sourceDegreeChecksum.getChecksum());
+ Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
+ .run(targetDegreeOnTargetId)
+ .execute();
- ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
+ assertEquals(20884, checksumOnTargetId.getCount());
+ assertEquals(0x000000019d8f0070L, checksumOnTargetId.getChecksum());
+
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = undirectedRMatGraph
.run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
- .setReduceOnSourceId(true)));
+ .setReduceOnSourceId(true));
+
+ Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
+ .run(targetDegreeOnSourceId)
+ .execute();
- assertEquals(sourceDegreeChecksum, targetDegreeChecksum);
+ assertEquals(checksumOnTargetId, checksumOnSourceId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
index 37e1bb9..102beae 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
@@ -19,10 +19,10 @@
package org.apache.flink.graph.asm.degree.annotate.undirected;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -45,16 +45,16 @@ extends AsmTestBase {
"(4,1)\n" +
"(5,1)";
- DataSet<Vertex<IntValue, LongValue>> sourceDegrees = undirectedSimpleGraph
+ DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph
.run(new VertexDegree<IntValue, NullValue, NullValue>());
- TestBaseUtils.compareResultAsText(sourceDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult);
- DataSet<Vertex<IntValue, LongValue>> targetDegrees = undirectedSimpleGraph
+ DataSet<Vertex<IntValue, LongValue>> degreeOnTargetId = undirectedSimpleGraph
.run(new VertexDegree<IntValue, NullValue, NullValue>()
.setReduceOnTargetId(true));
- TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degreeOnTargetId.collect(), expectedResult);
}
@Test
@@ -62,18 +62,18 @@ extends AsmTestBase {
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
- DataSet<Vertex<LongValue, LongValue>> sourceDegrees = completeGraph
+ DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = completeGraph
.run(new VertexDegree<LongValue, NullValue, NullValue>());
- for (Vertex<LongValue, LongValue> vertex : sourceDegrees.collect()) {
+ for (Vertex<LongValue, LongValue> vertex : degreeOnSourceId.collect()) {
assertEquals(expectedDegree, vertex.getValue().getValue());
}
- DataSet<Vertex<LongValue, LongValue>> targetDegrees = completeGraph
+ DataSet<Vertex<LongValue, LongValue>> degreeOnTargetId = completeGraph
.run(new VertexDegree<LongValue, NullValue, NullValue>()
.setReduceOnTargetId(true));
- for (Vertex<LongValue, LongValue> vertex : targetDegrees.collect()) {
+ for (Vertex<LongValue, LongValue> vertex : degreeOnTargetId.collect()) {
assertEquals(expectedDegree, vertex.getValue().getValue());
}
}
@@ -81,15 +81,15 @@ extends AsmTestBase {
@Test
public void testWithEmptyGraph()
throws Exception {
- DataSet<Vertex<LongValue, LongValue>> vertexDegrees;
+ DataSet<Vertex<LongValue, LongValue>> degree;
- vertexDegrees = emptyGraph
+ degree = emptyGraph
.run(new VertexDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
- assertEquals(0, vertexDegrees.collect().size());
+ assertEquals(0, degree.collect().size());
- vertexDegrees = emptyGraph
+ degree = emptyGraph
.run(new VertexDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -98,22 +98,30 @@ extends AsmTestBase {
"(1,0)\n" +
"(2,0)";
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degree.collect(), expectedResult);
}
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
- .run(new VertexDegree<LongValue, NullValue, NullValue>()));
+ DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = undirectedRMatGraph
+ .run(new VertexDegree<LongValue, NullValue, NullValue>());
+
+ Checksum checksumOnSourceId = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
+ .run(degreeOnSourceId)
+ .execute();
- assertEquals(902, sourceDegreeChecksum.getCount());
- assertEquals(0x0000000000e1fb30L, sourceDegreeChecksum.getChecksum());
+ assertEquals(902, checksumOnSourceId.getCount());
+ assertEquals(0x0000000000e1fb30L, checksumOnSourceId.getChecksum());
- ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
+ DataSet<Vertex<LongValue, LongValue>> degreeOnTargetId = undirectedRMatGraph
.run(new VertexDegree<LongValue, NullValue, NullValue>()
- .setReduceOnTargetId(true)));
+ .setReduceOnTargetId(true));
+
+ Checksum checksumOnTargetId = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
+ .run(degreeOnTargetId)
+ .execute();
- assertEquals(sourceDegreeChecksum, targetDegreeChecksum);
+ assertEquals(checksumOnSourceId, checksumOnTargetId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
index ca96f24..55f7743 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -18,9 +18,9 @@
package org.apache.flink.graph.asm.degree.filter.undirected;
-import org.apache.flink.api.java.Utils;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.library.metric.ChecksumHashCode;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
@@ -62,7 +62,7 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
- Utils.ChecksumHashCode checksum = undirectedRMatGraph
+ Checksum checksum = undirectedRMatGraph
.run(new MaximumDegree<LongValue, NullValue, NullValue>(16))
.run(new ChecksumHashCode<LongValue, NullValue, NullValue>())
.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
index d8d93ad..77d9dba 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
@@ -20,9 +20,8 @@ package org.apache.flink.graph.library.clustering.directed;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
@@ -76,8 +75,12 @@ extends AsmTestBase {
@Test
public void testRMatGraph()
throws Exception {
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()));
+ DataSet<Result<LongValue>> cc = directedRMatGraph
+ .run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
+
+ Checksum checksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode<Result<LongValue>>()
+ .run(cc)
+ .execute();
assertEquals(902, checksum.getCount());
assertEquals(0x000001bf83866775L, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
index 3c86358..6ae9b90 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.graph.library.clustering.directed;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
@@ -76,7 +76,9 @@ extends AsmTestBase {
.run(new TriangleListing<LongValue, NullValue, NullValue>()
.setSortTriangleVertices(true));
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
+ Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
+ .run(tl)
+ .execute();
assertEquals(75049, checksum.getCount());
assertEquals(0x00000033111f11baL, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
index f5416fb..ba0834c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.graph.library.clustering.undirected;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
@@ -76,8 +76,12 @@ extends AsmTestBase {
@Test
public void testRMatGraph()
throws Exception {
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(undirectedRMatGraph
- .run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()));
+ DataSet<Result<LongValue>> cc = undirectedRMatGraph
+ .run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
+
+ Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
+ .run(cc)
+ .execute();
assertEquals(902, checksum.getCount());
assertEquals(0x000001cab2d3677bL, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
index 0d1ebd0..bc3914e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.graph.library.clustering.undirected;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -58,7 +58,9 @@ extends AsmTestBase {
DataSet<Tuple3<LongValue, LongValue, LongValue>> tl = completeGraph
.run(new TriangleListing<LongValue, NullValue, NullValue>());
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
+ Checksum checksum = new ChecksumHashCode<Tuple3<LongValue, LongValue, LongValue>>()
+ .run(tl)
+ .execute();
assertEquals(expectedCount, checksum.getCount());
}
@@ -70,7 +72,9 @@ extends AsmTestBase {
.run(new TriangleListing<LongValue, NullValue, NullValue>()
.setSortTriangleVertices(true));
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
+ Checksum checksum = new ChecksumHashCode<Tuple3<LongValue, LongValue, LongValue>>()
+ .run(tl)
+ .execute();
assertEquals(75049, checksum.getCount());
assertEquals(0x00000001a5b500afL, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index 679bf4f..2e5cebe 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -19,10 +19,10 @@
package org.apache.flink.graph.library.link_analysis;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.library.link_analysis.HITS.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -79,8 +79,12 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new HITS<LongValue, NullValue, NullValue>(0.000001)));
+ DataSet<Result<LongValue>> hits = directedRMatGraph
+ .run(new HITS<LongValue, NullValue, NullValue>(0.000001));
+
+ Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
+ .run(hits)
+ .execute();
assertEquals(902, checksum.getCount());
assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
index 73545fb..24f0c2d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
@@ -18,9 +18,9 @@
package org.apache.flink.graph.library.metric;
-import org.apache.flink.api.java.Utils;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.test.TestGraphUtils;
import org.junit.Test;
@@ -36,7 +36,7 @@ extends AsmTestBase {
TestGraphUtils.getLongLongEdgeData(env),
env);
- Utils.ChecksumHashCode checksum = graph
+ Checksum checksum = graph
.run(new ChecksumHashCode<Long, Long, Long>())
.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 58c986d..9490459 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.graph.library.similarity;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -129,7 +129,9 @@ extends AsmTestBase {
.run(new JaccardIndex<LongValue, NullValue, NullValue>()
.setGroupSize(4));
- ChecksumHashCode checksum = DataSetUtils.checksumHashCode(ji);
+ Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
+ .run(ji)
+ .execute();
assertEquals(13954, checksum.getCount());
assertEquals(0x00001b1a1f7a9d0bL, checksum.getChecksum());
[2/2] flink git commit: [FLINK-5694] [gelly] Collect DataSetAnalytic
Posted by gr...@apache.org.
[FLINK-5694] [gelly] Collect DataSetAnalytic
Adds a DataSetAnalytic that accumulates elements and returns elements
using a List. This mirrors the implementation of DataSet.collect() but
using the analytic execution workflow.
This closes #3245
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49016225
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49016225
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49016225
Branch: refs/heads/master
Commit: 4901622594af7b727baf08be6ee82803e7c4e645
Parents: a7e5705
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Jan 31 12:20:14 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Feb 6 09:44:41 2017 -0500
----------------------------------------------------------------------
.../apache/flink/graph/asm/dataset/Collect.java | 103 +++++++++++++++++++
.../flink/graph/asm/dataset/CollectTest.java | 55 ++++++++++
2 files changed, 158 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/49016225/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
new file mode 100644
index 0000000..4398296
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.AnalyticHelper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Collect the elements of a {@link DataSet} into a {@link List}.
+ *
+ * @param <T> element type
+ */
+public class Collect<T>
+extends AbstractDataSetAnalytic<T, List<T>> {
+
+ private static final String COLLECT = "collect";
+
+ private CollectHelper<T> collectHelper;
+
+ private TypeSerializer<T> serializer;
+
+ @Override
+ public Collect<T> run(DataSet<T> input)
+ throws Exception {
+ super.run(input);
+
+ ExecutionEnvironment env = input.getExecutionEnvironment();
+ serializer = input.getType().createSerializer(env.getConfig());
+
+ collectHelper = new CollectHelper<>(serializer);
+
+ input
+ .output(collectHelper)
+ .name("Collect");
+
+ return this;
+ }
+
+ @Override
+ public List<T> getResult() {
+ ArrayList<byte[]> accResult = collectHelper.getAccumulator(env, COLLECT);
+ if (accResult != null) {
+ try {
+ return SerializedListAccumulator.deserializeList(accResult, serializer);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find type class of collected data type", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Serialization error while deserializing collected data", e);
+ }
+ } else {
+ throw new RuntimeException("Unable to retrieve the DataSet");
+ }
+ }
+
+ private static class CollectHelper<U>
+ extends AnalyticHelper<U> {
+ private SerializedListAccumulator<U> accumulator;
+
+ private final TypeSerializer<U> serializer;
+
+ public CollectHelper(TypeSerializer<U> serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) {
+ this.accumulator = new SerializedListAccumulator<>();
+ }
+
+ @Override
+ public void writeRecord(U record) throws IOException {
+ accumulator.add(record, serializer);
+ }
+
+ @Override
+ public void close() throws IOException {
+ addAccumulator(COLLECT, accumulator);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/49016225/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
new file mode 100644
index 0000000..ec1af42
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class CollectTest {
+
+ private ExecutionEnvironment env;
+
+ @Before
+ public void setup()
+ throws Exception {
+ env = ExecutionEnvironment.createCollectionsEnvironment();
+ env.getConfig().enableObjectReuse();
+ }
+
+ @Test
+ public void testCollect()
+ throws Exception {
+ List<Long> list = Arrays.asList(ArrayUtils.toObject(
+ new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
+
+ DataSet<Long> dataset = env.fromCollection(list);
+
+ List<Long> collected = new Collect<Long>().run(dataset).execute();
+
+ assertArrayEquals(list.toArray(), collected.toArray());
+ }
+}