You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/01/31 16:50:42 UTC

flink git commit: [FLINK-5558] [gelly] Replace TriangleCount with a Count analytic

Repository: flink
Updated Branches:
  refs/heads/master 5ca243752 -> 087d191ff


[FLINK-5558] [gelly] Replace TriangleCount with a Count analytic

TriangleCount can be replaced by a generic Count analytic for DataSet.
The analytics currently using TriangleCount can simply use
TriangleListing and Count.

Gelly includes both directed and undirected versions of TriangleListing
and therefore two versions of TriangleCount which will be replaced by a
single Count analytic which can be reused elsewhere.

This closes #3169


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/087d191f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/087d191f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/087d191f

Branch: refs/heads/master
Commit: 087d191ff7ffaa33ebd719cba4aea08917c4618f
Parents: 5ca2437
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jan 31 11:49:08 2017 -0500

----------------------------------------------------------------------
 .../org/apache/flink/graph/GraphAnalytic.java   |  2 +-
 .../asm/dataset/AbstractDataSetAnalytic.java    | 58 ++++++++++++++
 .../apache/flink/graph/asm/dataset/Count.java   | 72 ++++++++++++++++++
 .../graph/asm/dataset/DataSetAnalytic.java      | 74 ++++++++++++++++++
 .../directed/GlobalClusteringCoefficient.java   | 14 +++-
 .../clustering/directed/TriangleCount.java      | 79 -------------------
 .../undirected/GlobalClusteringCoefficient.java | 15 +++-
 .../clustering/undirected/TriadicCensus.java    | 15 +++-
 .../clustering/undirected/TriangleCount.java    | 80 --------------------
 .../flink/graph/asm/dataset/CountTest.java      | 55 ++++++++++++++
 .../clustering/directed/TriangleCountTest.java  | 75 ------------------
 .../undirected/TriangleCountTest.java           | 75 ------------------
 12 files changed, 292 insertions(+), 322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
index 0bdc792..e72b853 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.DataSet;
 
 /**
  * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal
- * and results are retrieved via accumulators.  A Flink program has a single
+ * and results are retrieved via accumulators. A Flink program has a single
  * point of execution. A {@code GraphAnalytic} defers execution to the user to
  * allow composing multiple analytics and algorithms into a single program.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/AbstractDataSetAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/AbstractDataSetAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/AbstractDataSetAnalytic.java
new file mode 100644
index 0000000..46007ca
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/AbstractDataSetAnalytic.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for {@link DataSetAnalytic}.
+ *
+ * @param <T> element type
+ * @param <R> the return type
+ */
+public abstract class AbstractDataSetAnalytic<T, R>
+implements DataSetAnalytic<T, R> {
+
+	protected ExecutionEnvironment env;
+
+	@Override
+	public AbstractDataSetAnalytic<T, R> run(DataSet<T> input)
+			throws Exception {
+		env = input.getExecutionEnvironment();
+		return this;
+	}
+
+	@Override
+	public R execute()
+			throws Exception {
+		env.execute();
+		return getResult();
+	}
+
+	@Override
+	public R execute(String jobName)
+			throws Exception {
+		Preconditions.checkNotNull(jobName);
+
+		env.execute(jobName);
+		return getResult();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
new file mode 100644
index 0000000..7303d3a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.AnalyticHelper;
+
+import java.io.IOException;
+
+/**
+ * Count the number of elements in a {@link DataSet}.
+ *
+ * @param <T> element type
+ */
+public class Count<T>
+extends AbstractDataSetAnalytic<T, Long> {
+
+	private static final String COUNT = "count";
+
+	private CountHelper<T> countHelper;
+
+	@Override
+	public Count<T> run(DataSet<T> input)
+			throws Exception {
+		super.run(input);
+
+		countHelper = new CountHelper<>();
+
+		input
+			.output(countHelper)
+				.name("Count");
+
+		return this;
+	}
+
+	@Override
+	public Long getResult() {
+		return countHelper.getAccumulator(env, COUNT);
+	}
+
+	private static class CountHelper<U>
+	extends AnalyticHelper<U> {
+		private long count;
+
+		@Override
+		public void writeRecord(U record) throws IOException {
+			count++;
+		}
+
+		@Override
+		public void close() throws IOException {
+			addAccumulator(COUNT, new LongCounter(count));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
new file mode 100644
index 0000000..abf4039
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+
+/**
+ * A {@code DataSetAnalytic} is similar to a {@link CustomUnaryOperation} but
+ * is terminal and results are retrieved via accumulators. A Flink program has
+ * a single point of execution. A {@code DataSetAnalytic} defers execution to
+ * the user to allow composing multiple analytics and algorithms into a single
+ * program.
+ *
+ * @param <T> element type
+ * @param <R> the return type
+ */
+public interface DataSetAnalytic<T, R> {
+
+	/**
+	 * This method must be called after the program has executed:
+	 *  1) "run" analytics and algorithms
+	 *  2) call ExecutionEnvironment.execute()
+	 *  3) get analytic results
+	 *
+	 * @return the result
+	 */
+	R getResult();
+
+	/**
+	 * Execute the program and return the result.
+	 *
+	 * @return the result
+	 * @throws Exception
+	 */
+	R execute() throws Exception;
+
+	/**
+	 * Execute the program and return the result.
+	 *
+	 * @param jobName the name to assign to the job
+	 * @return the result
+	 * @throws Exception
+	 */
+	R execute(String jobName) throws Exception;
+
+	/**
+	 * All {@code DataSetAnalytic} processing must be terminated by an
+	 * {@link OutputFormat} and obtained via accumulators rather than
+	 * returned by a {@link DataSet}.
+	 *
+	 * @param input input dataset
+	 * @return this
+	 * @throws Exception
+	 */
+	DataSetAnalytic<T, R> run(DataSet<T> input) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
index 271e080..9e0b203 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
@@ -20,9 +20,11 @@ package org.apache.flink.graph.library.clustering.directed;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient.Result;
+import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.library.metric.directed.VertexMetrics;
 import org.apache.flink.types.CopyableValue;
 
@@ -39,7 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-	private TriangleCount<K, VV, EV> triangleCount;
+	private Count<TriangleListing.Result<K>> triangleCount;
 
 	private VertexMetrics<K, VV, EV> vertexMetrics;
 
@@ -70,10 +72,14 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			throws Exception {
 		super.run(input);
 
-		triangleCount = new TriangleCount<K, VV, EV>()
-			.setLittleParallelism(littleParallelism);
+		triangleCount = new Count<>();
 
-		input.run(triangleCount);
+		DataSet<TriangleListing.Result<K>> triangles = input
+			.run(new TriangleListing<K, VV, EV>()
+				.setSortTriangleVertices(false)
+				.setLittleParallelism(littleParallelism));
+
+		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
 			.setParallelism(littleParallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java
deleted file mode 100644
index 82b1c8e..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.clustering.directed;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.CountHelper;
-import org.apache.flink.graph.AbstractGraphAnalytic;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.util.AbstractID;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Count the number of distinct triangles in an undirected graph.
- *
- * @param <K> graph ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- * @see TriangleListing
- */
-public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Long> {
-
-	private String id = new AbstractID().toString();
-
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriangleCount<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
-	@Override
-	public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input)
-			throws Exception {
-		super.run(input);
-
-		DataSet<TriangleListing.Result<K>> triangles = input
-			.run(new TriangleListing<K, VV, EV>()
-				.setSortTriangleVertices(false)
-				.setLittleParallelism(littleParallelism));
-
-		triangles
-			.output(new CountHelper<TriangleListing.Result<K>>(id))
-				.name("Count triangles");
-
-		return this;
-	}
-
-	@Override
-	public Long getResult() {
-		return env.getLastJobExecutionResult().<Long> getAccumulatorResult(id);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index c11cb3c..b24155b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -20,9 +20,12 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
+import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
 import org.apache.flink.types.CopyableValue;
 
@@ -39,7 +42,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-	private TriangleCount<K, VV, EV> triangleCount;
+	private Count<Tuple3<K, K, K>> triangleCount;
 
 	private VertexMetrics<K, VV, EV> vertexMetrics;
 
@@ -70,10 +73,14 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			throws Exception {
 		super.run(input);
 
-		triangleCount = new TriangleCount<K, VV, EV>()
-			.setLittleParallelism(littleParallelism);
+		triangleCount = new Count<>();
 
-		input.run(triangleCount);
+		DataSet<Tuple3<K, K, K>> triangles = input
+			.run(new TriangleListing<K, VV, EV>()
+				.setSortTriangleVertices(false)
+				.setLittleParallelism(littleParallelism));
+
+		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
 			.setParallelism(littleParallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index f057803..3f59d0e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -20,8 +20,11 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.dataset.Count;
+import org.apache.flink.graph.library.clustering.directed.TriangleListing;
 import org.apache.flink.graph.library.clustering.undirected.TriadicCensus.Result;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
 import org.apache.flink.types.CopyableValue;
@@ -48,7 +51,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class TriadicCensus<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-	private TriangleCount<K, VV, EV> triangleCount;
+	private Count<TriangleListing.Result<K>> triangleCount;
 
 	private VertexMetrics<K, VV, EV> vertexMetrics;
 
@@ -72,10 +75,14 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			throws Exception {
 		super.run(input);
 
-		triangleCount = new TriangleCount<K, VV, EV>()
-			.setLittleParallelism(littleParallelism);
+		triangleCount = new Count<>();
 
-		input.run(triangleCount);
+		DataSet<TriangleListing.Result<K>> triangles = input
+			.run(new TriangleListing<K, VV, EV>()
+				.setSortTriangleVertices(false)
+				.setLittleParallelism(littleParallelism));
+
+		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
 			.setParallelism(littleParallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
deleted file mode 100644
index 46e1875..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.clustering.undirected;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.CountHelper;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.AbstractGraphAnalytic;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.util.AbstractID;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Count the number of distinct triangles in an undirected graph.
- *
- * @param <K> graph ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- * @see TriangleListing
- */
-public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Long> {
-
-	private String id = new AbstractID().toString();
-
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriangleCount<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
-	@Override
-	public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input)
-			throws Exception {
-		super.run(input);
-
-		DataSet<Tuple3<K, K, K>> triangles = input
-			.run(new TriangleListing<K, VV, EV>()
-				.setSortTriangleVertices(false)
-				.setLittleParallelism(littleParallelism));
-
-		triangles
-			.output(new CountHelper<Tuple3<K, K, K>>(id))
-				.name("Count triangles");
-
-		return this;
-	}
-
-	@Override
-	public Long getResult() {
-		return env.getLastJobExecutionResult().<Long> getAccumulatorResult(id);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
new file mode 100644
index 0000000..476c2e6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class CountTest {
+
+	private ExecutionEnvironment env;
+
+	@Before
+	public void setup()
+			throws Exception {
+		env = ExecutionEnvironment.createCollectionsEnvironment();
+		env.getConfig().enableObjectReuse();
+	}
+
+	@Test
+	public void testCount()
+			throws Exception {
+		List<Long> list = Arrays.asList(ArrayUtils.toObject(
+			new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
+
+		DataSet<Long> dataset = env.fromCollection(list);
+
+		long count = new Count<Long>().run(dataset).execute();
+
+		assertEquals(list.size(), count);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleCountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleCountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleCountTest.java
deleted file mode 100644
index cd4ddf4..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleCountTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.clustering.directed;
-
-import org.apache.commons.math3.util.CombinatoricsUtils;
-import org.apache.flink.graph.asm.AsmTestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class TriangleCountTest
-extends AsmTestBase {
-
-	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
-		long triangleCount = new TriangleCount<IntValue, NullValue, NullValue>()
-			.run(directedSimpleGraph)
-			.execute();
-
-		assertEquals(2, triangleCount);
-	}
-
-	@Test
-	public void testWithCompleteGraph()
-			throws Exception {
-		long expectedDegree = completeGraphVertexCount - 1;
-		long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
-
-		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
-			.run(completeGraph)
-			.execute();
-
-		assertEquals(expectedCount, triangleCount);
-	}
-
-	@Test
-	public void testWithEmptyGraph()
-			throws Exception {
-		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
-			.run(emptyGraph)
-			.execute();
-
-		assertEquals(0, triangleCount);
-	}
-
-	@Test
-	public void testWithRMatGraph()
-			throws Exception {
-		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
-			.run(directedRMatGraph)
-			.execute();
-
-		assertEquals(75049, triangleCount);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/087d191f/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
deleted file mode 100644
index 6bf9b0d..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.clustering.undirected;
-
-import org.apache.commons.math3.util.CombinatoricsUtils;
-import org.apache.flink.graph.asm.AsmTestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class TriangleCountTest
-extends AsmTestBase {
-
-	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
-		long triangleCount = new TriangleCount<IntValue, NullValue, NullValue>()
-			.run(undirectedSimpleGraph)
-			.execute();
-
-		assertEquals(2, triangleCount);
-	}
-
-	@Test
-	public void testWithCompleteGraph()
-			throws Exception {
-		long expectedDegree = completeGraphVertexCount - 1;
-		long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
-
-		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
-			.run(completeGraph)
-			.execute();
-
-		assertEquals(expectedCount, triangleCount);
-	}
-
-	@Test
-	public void testWithEmptyGraph()
-			throws Exception {
-		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
-			.run(emptyGraph)
-			.execute();
-
-		assertEquals(0, triangleCount);
-	}
-
-	@Test
-	public void testWithRMatGraph()
-			throws Exception {
-		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
-			.run(undirectedRMatGraph)
-			.execute();
-
-		assertEquals(75049, triangleCount);
-	}
-}