You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 20:34:57 UTC
flink git commit: [FLINK-4949] [gelly] Refactor Gelly driver inputs
Repository: flink
Updated Branches:
refs/heads/master 31e120a98 -> f1ff99fdc
[FLINK-4949] [gelly] Refactor Gelly driver inputs
The Gelly drivers started as simple wrappers around library algorithms
but have grown to handle a matrix of input sources while often running
multiple algorithms and analytics with custom parameterization.
The monolithic drivers are replaced with separate inputs and algorithms.
Command-line parameter parsers are shared and reusable across inputs and
algorithms. Algorithm results now implement a common AlgorithmResult
interface. Drivers are now tested with integration tests.
This closes #3294
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1ff99fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1ff99fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1ff99fd
Branch: refs/heads/master
Commit: f1ff99fdc1e228acd936f5684832d5cf49bdbe04
Parents: 31e120a
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 31 15:57:54 2017 -0400
----------------------------------------------------------------------
docs/dev/libs/gelly/index.md | 40 ++-
flink-libraries/flink-gelly-examples/pom.xml | 9 +-
.../java/org/apache/flink/graph/Runner.java | 357 +++++++++++++++++++
.../main/java/org/apache/flink/graph/Usage.java | 71 ----
.../org/apache/flink/graph/RunnerITCase.java | 122 +++++++
.../flink/graph/drivers/AdamicAdarITCase.java | 52 +++
.../drivers/ClusteringCoefficientITCase.java | 89 +++++
.../drivers/ConnectedComponentsITCase.java | 65 ++++
.../flink/graph/drivers/DriverBaseITCase.java | 185 ++++++++++
.../flink/graph/drivers/EdgeListITCase.java | 240 +++++++++++++
.../flink/graph/drivers/GraphMetricsITCase.java | 100 ++++++
.../apache/flink/graph/drivers/HITSITCase.java | 52 +++
.../flink/graph/drivers/JaccardIndexITCase.java | 63 ++++
.../flink/graph/drivers/PageRankITCase.java | 52 +++
.../graph/drivers/TriangleListingITCase.java | 107 ++++++
15 files changed, 1518 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 8f8c6de..40018e8 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -73,7 +73,7 @@ Running Gelly Examples
The Gelly library and examples jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads")
in the folder **opt** (for versions older than Flink 1.2 these can be manually downloaded from
-[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly).
+[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly)).
To run the Gelly examples the **flink-gelly** (for Java) or **flink-gelly-scala** (for Scala) jar must be copied to
Flink's **lib** directory.
@@ -83,21 +83,29 @@ cp opt/flink-gelly_*.jar lib/
cp opt/flink-gelly-scala_*.jar lib/
~~~
-Gelly's examples jar includes both drivers for the library methods as well as additional example algorithms. After
-configuring and starting the cluster, list the available algorithm classes:
+Gelly's examples jar includes drivers for each of the library methods. After configuring and starting the cluster, list
+the available algorithm classes:
~~~bash
./bin/start-cluster.sh
./bin/flink run opt/flink-gelly-examples_*.jar
~~~
-The Gelly drivers can generate [RMat](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) graph data or read the
-edge list from a CSV file. Each node in a cluster must have access to the input file. Calculate graph metrics on a
-directed generated graph:
+The Gelly drivers can generate graph data or read the edge list from a CSV file (each node in a cluster must have access
+to the input file). The algorithm description, available inputs and outputs, and configuration are displayed when an
+algorithm is selected. Print usage for [JaccardIndex](./library_methods.html#jaccard-index):
~~~bash
-./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \
- --directed true --input rmat
+./bin/flink run opt/flink-gelly-examples_*.jar --algorithm JaccardIndex
+~~~
+
+Display [graph metrics](./library_methods.html#metric) for a million vertex graph:
+
+~~~bash
+./bin/flink run opt/flink-gelly-examples_*.jar \
+ --algorithm GraphMetrics --order directed \
+ --input RMatGraph --type integer --scale 20 --simplify directed \
+ --output print
~~~
The size of the graph is adjusted by the *\-\-scale* and *\-\-edge_factor* parameters. The
@@ -111,15 +119,19 @@ Run a few algorithms and monitor the job progress in Flink's Web UI:
~~~bash
wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt
-./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \
- --directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t'
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+ --algorithm GraphMetrics --order undirected \
+ --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
+ --output print
-./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/flink-gelly-examples_*.jar \
- --directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+ --algorithm ClusteringCoefficient --order undirected \
+ --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
--output hash
-./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/flink-gelly-examples_*.jar \
- --input csv --type integer --simplify true --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+ --algorithm JaccardIndex \
+ --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
--output hash
~~~
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index b533119..e95aa37 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -83,6 +83,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -163,7 +170,7 @@
<configuration>
<archive>
<manifestEntries>
- <Main-Class>org.apache.flink.graph.Usage</Main-Class>
+ <Main-Class>org.apache.flink.graph.Runner</Main-Class>
</manifestEntries>
</archive>
</configuration>
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
new file mode 100644
index 0000000..0324814
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.drivers.AdamicAdar;
+import org.apache.flink.graph.drivers.ClusteringCoefficient;
+import org.apache.flink.graph.drivers.ConnectedComponents;
+import org.apache.flink.graph.drivers.Driver;
+import org.apache.flink.graph.drivers.EdgeList;
+import org.apache.flink.graph.drivers.GraphMetrics;
+import org.apache.flink.graph.drivers.HITS;
+import org.apache.flink.graph.drivers.JaccardIndex;
+import org.apache.flink.graph.drivers.PageRank;
+import org.apache.flink.graph.drivers.TriangleListing;
+import org.apache.flink.graph.drivers.input.CompleteGraph;
+import org.apache.flink.graph.drivers.input.CycleGraph;
+import org.apache.flink.graph.drivers.input.EmptyGraph;
+import org.apache.flink.graph.drivers.input.GridGraph;
+import org.apache.flink.graph.drivers.input.HypercubeGraph;
+import org.apache.flink.graph.drivers.input.Input;
+import org.apache.flink.graph.drivers.input.PathGraph;
+import org.apache.flink.graph.drivers.input.RMatGraph;
+import org.apache.flink.graph.drivers.input.SingletonEdgeGraph;
+import org.apache.flink.graph.drivers.input.StarGraph;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.Parameterized;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This default main class executes Flink drivers.
+ *
+ * An execution has one input, one algorithm, and one output. Anything more
+ * complex can be expressed as a user program written in a JVM language.
+ *
+ * Inputs and algorithms are decoupled by, respectively, producing and
+ * consuming a graph. Currently only {@code Graph} is supported but later
+ * updates may add support for new graph types such as {@code BipartiteGraph}.
+ *
+ * Algorithms must explicitly support each type of output via implementation of
+ * interfaces. This is scalable as the number of outputs is small and finite.
+ */
+public class Runner {
+
+ private static final String INPUT = "input";
+
+ private static final String ALGORITHM = "algorithm";
+
+ private static final String OUTPUT = "output";
+
+ private static ParameterizedFactory<Input> inputFactory = new ParameterizedFactory<Input>()
+ .addClass(CompleteGraph.class)
+ .addClass(org.apache.flink.graph.drivers.input.CSV.class)
+ .addClass(CycleGraph.class)
+ .addClass(EmptyGraph.class)
+ .addClass(GridGraph.class)
+ .addClass(HypercubeGraph.class)
+ .addClass(PathGraph.class)
+ .addClass(RMatGraph.class)
+ .addClass(SingletonEdgeGraph.class)
+ .addClass(StarGraph.class);
+
+ private static ParameterizedFactory<Driver> driverFactory = new ParameterizedFactory<Driver>()
+ .addClass(AdamicAdar.class)
+ .addClass(ClusteringCoefficient.class)
+ .addClass(ConnectedComponents.class)
+ .addClass(EdgeList.class)
+ .addClass(GraphMetrics.class)
+ .addClass(HITS.class)
+ .addClass(JaccardIndex.class)
+ .addClass(PageRank.class)
+ .addClass(TriangleListing.class);
+
+ /**
+ * List available algorithms. This is displayed to the user when no valid
+ * algorithm is given in the program parameterization.
+ *
+ * @return usage string listing available algorithms
+ */
+ private static String getAlgorithmsListing() {
+ StrBuilder strBuilder = new StrBuilder();
+
+ strBuilder
+ .appendNewLine()
+ .appendln("Select an algorithm to view usage: flink run opt/flink-gelly-examples_<version>.jar --algorithm <algorithm>")
+ .appendNewLine()
+ .appendln("Available algorithms:");
+
+ for (Driver algorithm : driverFactory) {
+ strBuilder.append(" ")
+ .appendFixedWidthPadRight(algorithm.getName(), 30, ' ')
+ .append(algorithm.getShortDescription()).appendNewLine();
+ }
+
+ return strBuilder.toString();
+ }
+
+ /**
+ * Display the usage for the given algorithm. This includes options for all
+ * compatible inputs, the selected algorithm, and outputs implemented by
+ * the selected algorithm.
+ *
+ * @param algorithmName unique identifier of the selected algorithm
+ * @return usage string for the given algorithm
+ */
+ private static String getAlgorithmUsage(String algorithmName) {
+ StrBuilder strBuilder = new StrBuilder();
+
+ Driver algorithm = driverFactory.get(algorithmName);
+
+ strBuilder
+ .appendNewLine()
+ .appendNewLine()
+ .appendln(algorithm.getLongDescription())
+ .appendNewLine()
+ .append("usage: flink run opt/flink-gelly-examples_<version>.jar --algorithm ")
+ .append(algorithmName)
+ .append(" [algorithm options] --input <input> [input options] --output <output> [output options]")
+ .appendNewLine()
+ .appendNewLine()
+ .appendln("Available inputs:");
+
+ for (Input input : inputFactory) {
+ strBuilder
+ .append(" --input ")
+ .append(input.getName())
+ .append(" ")
+ .appendln(input.getUsage());
+ }
+
+ String algorithmParameterization = algorithm.getUsage();
+
+ if (algorithmParameterization.length() > 0) {
+ strBuilder
+ .appendNewLine()
+ .appendln("Algorithm configuration:")
+ .append(" ")
+ .appendln(algorithm.getUsage());
+ }
+
+ strBuilder
+ .appendNewLine()
+ .appendln("Available outputs:");
+
+ if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) {
+ strBuilder.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
+ }
+
+ if (algorithm instanceof Hash) {
+ strBuilder.appendln(" --output hash");
+ }
+
+ if (algorithm instanceof Print) {
+ strBuilder.appendln(" --output print");
+ }
+
+ return strBuilder
+ .appendNewLine()
+ .toString();
+ }
+
+ public static void main(String[] args) throws Exception {
+ // Set up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // should not have any non-Flink data types
+ env.getConfig().disableAutoTypeRegistration();
+ env.getConfig().disableForceAvro();
+ env.getConfig().disableForceKryo();
+
+ ParameterTool parameters = ParameterTool.fromArgs(args);
+ env.getConfig().setGlobalJobParameters(parameters);
+
+ // integration tests run with with object reuse both disabled and enabled
+ if (parameters.has("__disable_object_reuse")) {
+ env.getConfig().disableObjectReuse();
+ } else {
+ env.getConfig().enableObjectReuse();
+ }
+
+ // Usage
+
+ if (!parameters.has(ALGORITHM)) {
+ throw new ProgramParametrizationException(getAlgorithmsListing());
+ }
+
+ String algorithmName = parameters.get(ALGORITHM);
+ Driver algorithm = driverFactory.get(algorithmName);
+
+ if (algorithm == null) {
+ throw new ProgramParametrizationException("Unknown algorithm name: " + algorithmName);
+ }
+
+ if (!parameters.has(INPUT)) {
+ if (!parameters.has(OUTPUT)) {
+ // if neither input nor output is given then print algorithm usage
+ throw new ProgramParametrizationException(getAlgorithmUsage(algorithmName));
+ }
+ throw new ProgramParametrizationException("No input given");
+ }
+
+ String inputName = parameters.get(INPUT);
+ Input input = inputFactory.get(inputName);
+
+ if (input == null) {
+ throw new ProgramParametrizationException("Unknown input type: " + inputName);
+ }
+
+ // Input
+
+ input.configure(parameters);
+ Graph graph = input.create(env);
+
+ // Algorithm
+
+ algorithm.configure(parameters);
+ algorithm.plan(graph);
+
+ // Output
+ if (!parameters.has(OUTPUT)) {
+ throw new ProgramParametrizationException("No output given");
+ }
+
+ String outputName = parameters.get(OUTPUT);
+ String executionNamePrefix = input.getIdentity() + " -> " + algorithmName + " -> ";
+
+ System.out.println();
+
+ switch (outputName.toLowerCase()) {
+ case "csv":
+ if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) {
+ String filename = parameters.getRequired("output_filename");
+
+ String lineDelimiter = StringEscapeUtils.unescapeJava(
+ parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+ String fieldDelimiter = StringEscapeUtils.unescapeJava(
+ parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+ org.apache.flink.graph.drivers.output.CSV c = (org.apache.flink.graph.drivers.output.CSV) algorithm;
+ c.writeCSV(filename, lineDelimiter, fieldDelimiter);
+
+ env.execute(executionNamePrefix + "CSV");
+ } else {
+ throw new ProgramParametrizationException("Algorithm does not support output type 'CSV'");
+ }
+ break;
+
+ case "hash":
+ if (algorithm instanceof Hash) {
+ Hash h = (Hash) algorithm;
+ h.hash(executionNamePrefix + "Hash");
+ } else {
+ throw new ProgramParametrizationException("Algorithm does not support output type 'hash'");
+ }
+ break;
+
+ case "print":
+ if (algorithm instanceof Print) {
+ Print h = (Print) algorithm;
+ h.print(executionNamePrefix + "Print");
+ } else {
+ throw new ProgramParametrizationException("Algorithm does not support output type 'print'");
+ }
+ break;
+
+ default:
+ throw new ProgramParametrizationException("Unknown output type: " + outputName);
+ }
+ }
+
+ /**
+ * Stores a list of classes for which an instance can be requested by name
+ * and implements an iterator over class instances.
+ *
+ * @param <T> base type for stored classes
+ */
+ private static class ParameterizedFactory<T extends Parameterized>
+ implements Iterable<T> {
+ private List<Class<? extends T>> classes = new ArrayList<>();
+
+ /**
+ * Add a class to the factory.
+ *
+ * @param cls subclass of T
+ * @return this
+ */
+ public ParameterizedFactory<T> addClass(Class<? extends T> cls) {
+ this.classes.add(cls);
+ return this;
+ }
+
+ /**
+ * Obtain a class instance by name.
+ *
+ * @param name String matching getName()
+ * @return class instance or null if no matching class
+ */
+ public T get(String name) {
+ for (T instance : this) {
+ if (name.equals(instance.getName())) {
+ return instance;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new Iterator<T>() {
+ private int index;
+
+ @Override
+ public boolean hasNext() {
+ return index < classes.size();
+ }
+
+ @Override
+ public T next() {
+ return InstantiationUtil.instantiate(classes.get(index++));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
deleted file mode 100644
index 642fe5b..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.flink.client.program.ProgramParametrizationException;
-
-/**
- * This default main class prints usage listing available classes.
- */
-public class Usage {
-
- private static final Class[] DRIVERS = new Class[]{
- org.apache.flink.graph.drivers.ClusteringCoefficient.class,
- org.apache.flink.graph.drivers.GraphMetrics.class,
- org.apache.flink.graph.drivers.HITS.class,
- org.apache.flink.graph.drivers.JaccardIndex.class,
- org.apache.flink.graph.drivers.TriangleListing.class,
- };
-
- private static final Class[] EXAMPLES = new Class[]{
- org.apache.flink.graph.examples.EuclideanGraphWeighing.class,
- org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class,
- org.apache.flink.graph.examples.IncrementalSSSP.class,
- org.apache.flink.graph.examples.MusicProfiles.class,
- org.apache.flink.graph.examples.PregelSSSP.class,
- org.apache.flink.graph.examples.SingleSourceShortestPaths.class,
- org.apache.flink.graph.scala.examples.ConnectedComponents.class,
- org.apache.flink.graph.scala.examples.GSASingleSourceShortestPaths.class,
- org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class,
- };
-
- private static String getUsage() {
- StrBuilder strBuilder = new StrBuilder();
-
- strBuilder.appendNewLine();
- strBuilder.appendln("Driver classes call algorithms from the Gelly library:");
- for (Class cls : DRIVERS) {
- strBuilder.append(" ").appendln(cls.getName());
- }
-
- strBuilder.appendNewLine();
- strBuilder.appendln("Example classes illustrate Gelly APIs or alternative algorithms:");
- for (Class cls : EXAMPLES) {
- strBuilder.append(" ").appendln(cls.getName());
- }
-
- return strBuilder.toString();
- }
-
- public static void main(String[] args) throws Exception {
- // this exception is throw to prevent Flink from printing an error message
- throw new ProgramParametrizationException(getUsage());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
new file mode 100644
index 0000000..a48fdf1
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.drivers.DriverBaseITCase;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RunnerITCase
+extends DriverBaseITCase {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ public RunnerITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testWithoutAlgorithm() throws Exception {
+ String expected = "Select an algorithm to view usage:";
+
+ thrown.expect(ProgramParametrizationException.class);
+ thrown.expectMessage(expected);
+
+ expectedOutput(new String[]{}, expected);
+ }
+
+ @Test
+ public void testWithUnknownAlgorithm() throws Exception {
+ String expected = "Unknown algorithm name: NotAnAlgorithm";
+
+ thrown.expect(ProgramParametrizationException.class);
+ thrown.expectMessage(expected);
+
+ expectedOutput(new String[]{"--algorithm", "NotAnAlgorithm"}, expected);
+ }
+
+ @Test
+ public void testAlgorithmUsage() throws Exception {
+ String expected = "Pass-through of the graph's edge list.";
+
+ thrown.expect(ProgramParametrizationException.class);
+ thrown.expectMessage(expected);
+
+ expectedOutput(new String[]{"--algorithm", "EdgeList"}, expected);
+ }
+
+ @Test
+ public void testWithoutInput() throws Exception {
+ String expected = "No input given";
+
+ thrown.expect(ProgramParametrizationException.class);
+ thrown.expectMessage(expected);
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--output", "NotAnOutput"},
+ expected);
+ }
+
+ @Test
+ public void testWithUnknownInput() throws Exception {
+ String expected = "Unknown input type: NotAnInput";
+
+ thrown.expect(ProgramParametrizationException.class);
+ thrown.expectMessage(expected);
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "NotAnInput"},
+ expected);
+ }
+
+ @Test
+ public void testWithoutOutput() throws Exception {
+ String expected = "No output given";
+
+ thrown.expect(ProgramParametrizationException.class);
+ thrown.expectMessage(expected);
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph"},
+ expected);
+ }
+
+ @Test
+ public void testWithUnknownOutput() throws Exception {
+ String expected = "Unknown output type: NotAnOutput";
+
+ thrown.expect(ProgramParametrizationException.class);
+ thrown.expectMessage(expected);
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph",
+ "--output", "NotAnOutput"},
+ expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
new file mode 100644
index 0000000..400c241
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AdamicAdarITCase
+extends DriverBaseITCase {
+
+ public AdamicAdarITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new AdamicAdar().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "AdamicAdar"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "AdamicAdar",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output", "print"},
+ 221628);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
new file mode 100644
index 0000000..f215b91
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ClusteringCoefficientITCase
+extends DriverBaseITCase {
+
+ public ClusteringCoefficientITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new ClusteringCoefficient().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "ClusteringCoefficient"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+ String expected = "\n" +
+ "ChecksumHashCode 0x000001c0409df6c0, count 902\n" +
+ "triplet count: 1003442, triangle count: 225147, global clustering coefficient: 0.22437470[0-9]+\n" +
+ "vertex count: 902, average clustering coefficient: 0.32943748[0-9]+\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "ClusteringCoefficient", "--order", "directed",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "ClusteringCoefficient", "--order", "directed",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+ "--output", "print"},
+ 904);
+ }
+
+ @Test
+ public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+ String expected = "\n" +
+ "ChecksumHashCode 0x000001ccf8c45fdb, count 902\n" +
+ "triplet count: 1003442, triangle count: 225147, global clustering coefficient: 0.22437470[0-9]+\n" +
+ "vertex count: 902, average clustering coefficient: 0.42173070[0-9]+\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "ClusteringCoefficient", "--order", "undirected",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "ClusteringCoefficient", "--order", "undirected",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output", "print"},
+ 904);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
new file mode 100644
index 0000000..b91abb3
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ConnectedComponentsITCase
+extends DriverBaseITCase {
+
+ public ConnectedComponentsITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new ConnectedComponents().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "ConnectedComponents"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testHashWithRMatIntegerGraph() throws Exception {
+ String expected = "\\nChecksumHashCode 0x0000000000cdc7e7, count 838\\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "ConnectedComponents",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", "--edge_factor", "1",
+ "--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "ConnectedComponents",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", "--edge_factor", "1",
+ "--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0",
+ "--output", "print"},
+ 838);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
new file mode 100644
index 0000000..d19ca97
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.graph.Runner;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.runners.Parameterized;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.regex.Pattern;
+
+public abstract class DriverBaseITCase
+extends MultipleProgramsTestBase {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ protected DriverBaseITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ // extend MultipleProgramsTestBase default to include object reuse mode
+ @Parameterized.Parameters(name = "Execution mode = {0}")
+ public static Collection<Object[]> executionModes() {
+ return Arrays.asList(
+ new Object[] { TestExecutionMode.CLUSTER },
+ new Object[] { TestExecutionMode.CLUSTER_OBJECT_REUSE },
+ new Object[] { TestExecutionMode.COLLECTION });
+ }
+
+ /**
+ * Simpler variant of {@link #expectedOutput(String[], String)}
+ * that only compares the count of the number of records in standard output.
+ * This is intended for use for algorithms where the result cannot be
+ * hashed due to approximate results (typically floating point arithmetic).
+ *
+ * @param parameters algorithm, input, and output arguments
+ * @param records expected number of records in standard output
+ * @throws Exception on error
+ */
+ protected void expectedCount(String[] parameters, int records) throws Exception {
+ String output = getSystemOutput(parameters);
+
+ // subtract the extra newline
+ int numberOfRecords = output.split(System.getProperty("line.separator")).length - 1;
+ Assert.assertEquals(records, numberOfRecords);
+ }
+
+ /**
+ * Executes the driver with the provided arguments and compares the
+ * standard output with the given regular expression.
+ *
+ * @param parameters algorithm, input, and output arguments
+ * @param expected expected standard output
+ * @throws Exception on error
+ */
+ protected void expectedOutput(String[] parameters, String expected) throws Exception {
+ String output = getSystemOutput(parameters);
+
+ Assert.assertThat(output, RegexMatcher.matchesRegex(expected));
+ }
+
+ /**
+ * Executes the driver with the provided arguments and compares the
+ * exception and exception method with the given class and regular
+ * expression.
+ *
+ * @param parameters algorithm, input, and output arguments
+ * @param expected expected standard output
+ * @param exception expected exception
+ * @throws Exception on error when not matching exception
+ */
+ protected void expectedOutputFromException(String[] parameters, String expected,Class<? extends Throwable> exception) throws Exception {
+ expectedException.expect(exception);
+ expectedException.expectMessage(RegexMatcher.matchesRegex(expected));
+
+ getSystemOutput(parameters);
+ }
+
+ /**
+ * Generate a regular expression string by quoting the input string and
+ * adding wildcard matchers to the beginning and end.
+ *
+ * @param input source string
+ * @return regex string
+ */
+ protected String regexSubstring(String input) {
+ // Pattern.quote disables regex interpretation of the input string and
+ // flag expression "(?s)" (Pattern.DOTALL) matches "." against any
+ // character including line terminators
+ return "(?s).*" + Pattern.quote(input) + ".*";
+ }
+
+ /**
+ * Capture the command-line standard output from the driver execution.
+ *
+ * @param args driver command-line arguments
+ * @return standard output from driver execution
+ * @throws Exception on error
+ */
+ private String getSystemOutput(String[] args) throws Exception {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ // Configure object reuse mode
+ switch (mode) {
+ case CLUSTER:
+ case COLLECTION:
+ args = (String[])ArrayUtils.add(args, "--__disable_object_reuse");
+ break;
+
+ case CLUSTER_OBJECT_REUSE:
+ // object reuse is enabled by default when executing drivers
+ break;
+
+ default:
+ throw new FlinkRuntimeException("Unknown execution mode " + mode);
+ }
+
+ // Redirect stdout
+ PrintStream stdout = System.out;
+ System.setOut(new PrintStream(output));
+
+ Runner.main(args);
+
+ // Restore stdout
+ System.setOut(stdout);
+
+ return output.toString();
+ }
+
+ /**
+ * Implements a Hamcrest regex matcher. Hamcrest 2.0 provides
+ * Matchers.matchesPattern(String) but Flink depends on Hamcrest 1.3.
+ *
+ * see http://stackoverflow.com/a/25021229
+ */
+ private static class RegexMatcher
+ extends TypeSafeMatcher<String> {
+ private final String regex;
+
+ private RegexMatcher(final String regex) {
+ this.regex = regex;
+ }
+
+ @Override
+ public void describeTo(final Description description) {
+ description.appendText("matches regex=`" + regex + "`");
+ }
+
+ @Override
+ public boolean matchesSafely(final String string) {
+ return string.matches(regex);
+ }
+
+ public static RegexMatcher matchesRegex(final String regex) {
+ return new RegexMatcher(regex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
new file mode 100644
index 0000000..d9cac8b
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class EdgeListITCase
+extends DriverBaseITCase {
+
+ public EdgeListITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new EdgeList().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "EdgeList"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testHashWithCompleteGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x0000000006788c22, count 1722\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "CompleteGraph", "--vertex_count", "42",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithCycleGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x000000000050cea4, count 84\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "CycleGraph", "--vertex_count", "42",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithEmptyGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x0000000000000000, count 0\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "EmptyGraph", "--vertex_count", "42",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithGridGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x00000000357d33a6, count 2990\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "GridGraph", "--dim0", "5:true", "--dim1", "8:false", "--dim2", "13:true",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithHypercubeGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x0000000014a72800, count 2048\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "HypercubeGraph", "--dimensions", "8",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithPathGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x00000000004ee21a, count 82\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "PathGraph", "--vertex_count", "42",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithRMatIntegerGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x00000000ed469103, count 16384\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "integer",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithRMatIntegerDirectedGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x00000000c53bfc9b, count 12009\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithRMatIntegerUndirectedGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x00000001664eb9e4, count 20884\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithRMatLongGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x0000000116ee9103, count 16384\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "long",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testPrintWithRMatLongGraph() throws Exception {
+
+ expectedCount(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "long",
+ "--output", "print"},
+ 16384);
+ }
+
+ @Test
+ public void testHashWithRMatLongDirectedGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x00000000e3c4643b, count 12009\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "long", "--simplify", "directed",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithRMatLongUndirectedGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x000000019b67ae64, count 20884\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "long", "--simplify", "undirected",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithRMatStringGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x00000071dc80a623, count 16384\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "string",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithRMatStringDirectedGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x0000005d58b3fa7d, count 12009\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "string", "--simplify", "directed",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithRMatStringUndirectedGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x000000aa54987304, count 20884\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "RMatGraph", "--type", "string", "--simplify", "undirected",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithSingletonEdgeGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x0000000001af8ee8, count 200\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "SingletonEdgeGraph", "--vertex_pair_count", "100",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testHashWithStarGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x000000000042789a, count 82\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "EdgeList",
+ "--input", "StarGraph", "--vertex_count", "42",
+ "--output", "hash"},
+ expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
new file mode 100644
index 0000000..a5ea486
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphMetricsITCase
+extends DriverBaseITCase {
+
+ public GraphMetricsITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new GraphMetrics().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "GraphMetrics"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testWithDirectedRMatIntegerGraph() throws Exception {
+ String expected = "\n" +
+ "Vertex metrics:\n" +
+ " vertex count: 902\n" +
+ " edge count: 12,009\n" +
+ " unidirectional edge count: 8,875\n" +
+ " bidirectional edge count: 1,567\n" +
+ " average degree: 13.314\n" +
+ " density: 0.01477663\n" +
+ " triplet count: 1,003,442\n" +
+ " maximum degree: 463\n" +
+ " maximum out degree: 334\n" +
+ " maximum in degree: 342\n" +
+ " maximum triplets: 106,953\n" +
+ "\n" +
+ "Edge metrics:\n" +
+ " triangle triplet count: 107,817\n" +
+ " rectangle triplet count: 315,537\n" +
+ " maximum triangle triplets: 820\n" +
+ " maximum rectangle triplets: 3,822\n";
+
+ String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "directed",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+ "--output"};
+
+ expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
+ expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+ }
+
+ @Test
+ public void testWithUndirectedRMatIntegerGraph() throws Exception {
+ String expected = "\n" +
+ "Vertex metrics:\n" +
+ " vertex count: 902\n" +
+ " edge count: 10,442\n" +
+ " average degree: 23.153\n" +
+ " density: 0.025697\n" +
+ " triplet count: 1,003,442\n" +
+ " maximum degree: 463\n" +
+ " maximum triplets: 106,953\n" +
+ "\n" +
+ "Edge metrics:\n" +
+ " triangle triplet count: 107,817\n" +
+ " rectangle triplet count: 315,537\n" +
+ " maximum triangle triplets: 820\n" +
+ " maximum rectangle triplets: 3,822\n";
+
+ String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "undirected",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output"};
+
+ expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
+ expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
new file mode 100644
index 0000000..5474d1b
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HITSITCase
+extends DriverBaseITCase {
+
+ public HITSITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new HITS().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "HITS"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "HITS",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+ "--output", "print"},
+ 902);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
new file mode 100644
index 0000000..0632856
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JaccardIndexITCase
+extends DriverBaseITCase {
+
+ public JaccardIndexITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new JaccardIndex().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "JaccardIndex"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testHashWithRMatIntegerGraph() throws Exception {
+ String expected = "\nChecksumHashCode 0x0001b188570b2572, count 221628\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "JaccardIndex",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "JaccardIndex",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output", "print"},
+ 221628);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
new file mode 100644
index 0000000..d7301d0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase
+extends DriverBaseITCase {
+
+ public PageRankITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new PageRank().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "PageRank"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "PageRank",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+ "--output", "print"},
+ 902);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
new file mode 100644
index 0000000..0d2897c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TriangleListingITCase
+extends DriverBaseITCase {
+
+ public TriangleListingITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testLongDescription() throws Exception {
+ String expected = regexSubstring(new TriangleListing().getLongDescription());
+
+ expectedOutputFromException(
+ new String[]{"--algorithm", "TriangleListing"},
+ expected,
+ ProgramParametrizationException.class);
+ }
+
+ @Test
+ public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+ String expected = "\n" +
+ "ChecksumHashCode 0x0000001beffe6edd, count 75049\n" +
+ "Triadic census:\n" +
+ " 003: 113,435,893\n" +
+ " 012: 6,632,528\n" +
+ " 102: 983,535\n" +
+ " 021d: 118,574\n" +
+ " 021u: 118,566\n" +
+ " 021c: 237,767\n" +
+ " 111d: 129,773\n" +
+ " 111u: 130,041\n" +
+ " 030t: 16,981\n" +
+ " 030c: 5,535\n" +
+ " 201: 43,574\n" +
+ " 120d: 7,449\n" +
+ " 120u: 7,587\n" +
+ " 120c: 15,178\n" +
+ " 210: 17,368\n" +
+ " 300: 4,951\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "TriangleListing", "--order", "directed", "--sort_triangle_vertices", "--triadic_census",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "TriangleListing", "--order", "directed",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+ "--output", "print"},
+ 75049);
+ }
+
+ @Test
+ public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+ String expected = "\n" +
+ "ChecksumHashCode 0x00000000e6b3f32c, count 75049\n" +
+ "Triadic census:\n" +
+ " 03: 113,435,893\n" +
+ " 12: 7,616,063\n" +
+ " 21: 778,295\n" +
+ " 30: 75,049\n";
+
+ expectedOutput(
+ new String[]{"--algorithm", "TriangleListing", "--order", "undirected", "--sort_triangle_vertices", "--triadic_census",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output", "hash"},
+ expected);
+ }
+
+ @Test
+ public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
+ expectedCount(
+ new String[]{"--algorithm", "TriangleListing", "--order", "undirected",
+ "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+ "--output", "print"},
+ 75049);
+ }
+}