You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 16:35:27 UTC
[4/6] flink git commit: [FLINK-5912] [gelly] Inputs for CSV and graph
generators
[FLINK-5912] [gelly] Inputs for CSV and graph generators
Create Input classes for reading graphs from CSV as well as for each of
the graph generators.
This closes #3626
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ded25be4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ded25be4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ded25be4
Branch: refs/heads/master
Commit: ded25be4d8fc51f2a58cbd3264daffe48dca6b04
Parents: 963f46e
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Mar 27 10:08:59 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 31 11:16:05 2017 -0400
----------------------------------------------------------------------
.../apache/flink/graph/drivers/input/CSV.java | 115 ++++++++++++
.../graph/drivers/input/CompleteGraph.java | 60 +++++++
.../flink/graph/drivers/input/CycleGraph.java | 60 +++++++
.../flink/graph/drivers/input/EmptyGraph.java | 55 ++++++
.../flink/graph/drivers/input/GridGraph.java | 144 +++++++++++++++
.../graph/drivers/input/HypercubeGraph.java | 60 +++++++
.../flink/graph/drivers/input/PathGraph.java | 60 +++++++
.../flink/graph/drivers/input/RMatGraph.java | 174 +++++++++++++++++++
.../graph/drivers/input/SingletonEdgeGraph.java | 60 +++++++
.../flink/graph/drivers/input/StarGraph.java | 60 +++++++
.../graph/drivers/parameter/Parameterized.java | 2 +-
.../drivers/parameter/ParameterizedBase.java | 2 +-
.../flink/graph/drivers/parameter/Simplify.java | 137 +++++++++++++++
.../graph/drivers/parameter/SimplifyTest.java | 56 ++++++
14 files changed, 1043 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
new file mode 100644
index 0000000..58b65b6
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.Simplify;
+import org.apache.flink.graph.drivers.parameter.StringParameter;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+/**
+ * Read a {@link Graph} from a CSV file using {@link IntValue},
+ * {@link LongValue}, or {@link StringValue} keys.
+ *
+ * @param <K> key type
+ */
+public class CSV<K extends Comparable<K>>
+extends ParameterizedBase
+implements Input<K, NullValue, NullValue> {
+
+ private static final String INTEGER = "integer";
+
+ private static final String LONG = "long";
+
+ private static final String STRING = "string";
+
+ private ChoiceParameter type = new ChoiceParameter(this, "type")
+ .setDefaultValue(INTEGER)
+ .addChoices(LONG, STRING);
+
+ private StringParameter inputFilename = new StringParameter(this, "input_filename");
+
+ private StringParameter commentPrefix = new StringParameter(this, "comment_prefix")
+ .setDefaultValue("#");
+
+ private StringParameter lineDelimiter = new StringParameter(this, "input_line_delimiter")
+ .setDefaultValue(CsvInputFormat.DEFAULT_LINE_DELIMITER);
+
+ private StringParameter fieldDelimiter = new StringParameter(this, "input_field_delimiter")
+ .setDefaultValue(CsvInputFormat.DEFAULT_FIELD_DELIMITER);
+
+ private Simplify simplify = new Simplify(this);
+
+ @Override
+ public String getName() {
+ return CSV.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return WordUtils.capitalize(getName()) + WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")";
+ }
+
+ /**
+ * Generate the graph as configured.
+ *
+ * @param env execution environment
+ * @return input graph
+ */
+ public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception {
+ GraphCsvReader reader = Graph.fromCsvReader(inputFilename.getValue(), env)
+ .ignoreCommentsEdges(commentPrefix.getValue())
+ .lineDelimiterEdges(lineDelimiter.getValue())
+ .fieldDelimiterEdges(fieldDelimiter.getValue());
+
+ Graph<K, NullValue, NullValue> graph;
+
+ switch (type.getValue()) {
+ case INTEGER:
+ graph = (Graph<K, NullValue, NullValue>) reader
+ .keyType(IntValue.class);
+ break;
+
+ case LONG:
+ graph = (Graph<K, NullValue, NullValue>) reader
+ .keyType(LongValue.class);
+ break;
+
+ case STRING:
+ graph = (Graph<K, NullValue, NullValue>) reader
+ .keyType(StringValue.class);
+ break;
+
+ default:
+ throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
+ }
+
+ return simplify.simplify(graph);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
new file mode 100644
index 0000000..c31c5a8
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.CompleteGraph}.
+ */
+public class CompleteGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+ private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+ .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return CompleteGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + " (" + vertexCount.getValue() + ")";
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+ return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue())
+ .setParallelism(littleParallelism.getValue().intValue())
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
new file mode 100644
index 0000000..df66dab
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.CycleGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.CycleGraph}.
+ */
+public class CycleGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+ private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+ .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return CycleGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + " (" + vertexCount + ")";
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+ return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue())
+ .setParallelism(littleParallelism.getValue().intValue())
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
new file mode 100644
index 0000000..794317b
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.graph.generator.EmptyGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate an {@link org.apache.flink.graph.generator.EmptyGraph}.
+ */
+public class EmptyGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+ private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+ .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+ @Override
+ public String getName() {
+ return EmptyGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + " (" + vertexCount + ")";
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+ return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue())
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
new file mode 100644
index 0000000..d502215
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.GridGraph}.
+ */
+public class GridGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+ private static final String PREFIX = "dim";
+
+ private List<Dimension> dimensions = new ArrayList<>();
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return GridGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getUsage() {
+ return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints [--dim2 ...]]" + super.getUsage();
+ }
+
+ @Override
+ public void configure(ParameterTool parameterTool) throws ProgramParametrizationException {
+ super.configure(parameterTool);
+
+ // add dimensions as ordered by dimension ID (dim0, dim1, dim2, ...)
+
+ Map<Integer, String> dimensionMap = new TreeMap<>();
+
+ // first parse all dimensions into a sorted map
+ for (Map.Entry<String, String> entry : parameterTool.toMap().entrySet()) {
+ if (entry.getKey().startsWith(PREFIX)) {
+ int dimensionId = Integer.parseInt(entry.getKey().substring(PREFIX.length()));
+ dimensionMap.put(dimensionId, entry.getValue());
+ }
+ }
+
+ // then store dimensions in order
+ for (String field : dimensionMap.values()) {
+ dimensions.add(new Dimension(field));
+ }
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + " (" + dimensions + ")";
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+ org.apache.flink.graph.generator.GridGraph graph = new org.apache.flink.graph.generator.GridGraph(env);
+
+ for (Dimension dimension : dimensions) {
+ graph.addDimension(dimension.size, dimension.wrapEndpoints);
+ }
+
+ return graph
+ .setParallelism(littleParallelism.getValue().intValue())
+ .generate();
+ }
+
+ /**
+ * Stores and parses the size and endpoint wrapping configuration for a
+ * {@link org.apache.flink.graph.generator.GridGraph} dimension.
+ */
+ private static class Dimension {
+ private long size;
+
+ private boolean wrapEndpoints;
+
+ /**
+ * Configuration string to be parsed. The size integer and endpoint
+ * wrapping boolean must be separated by a colon.
+ *
+ * @param field configuration string
+ */
+ public Dimension(String field) {
+ ProgramParametrizationException exception = new ProgramParametrizationException("Grid dimension must use " +
+ "a colon to separate the integer size and boolean indicating whether the dimension endpoints are " +
+ "connected: '" + field + "'");
+
+ if (! field.contains(":")) {
+ throw exception;
+ }
+
+ String[] parts = field.split(":");
+
+ if (parts.length != 2) {
+ throw exception;
+ }
+
+ try {
+ size = Long.parseLong(parts[0]);
+ wrapEndpoints = Boolean.parseBoolean(parts[1]);
+ } catch(NumberFormatException ex) {
+ throw exception;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(size) + (wrapEndpoints ? "+" : "\u229e");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
new file mode 100644
index 0000000..3b87b00
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.HypercubeGraph.MINIMUM_DIMENSIONS;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.HypercubeGraph}.
+ */
+public class HypercubeGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+ private LongParameter dimensions = new LongParameter(this, "dimensions")
+ .setMinimumValue(MINIMUM_DIMENSIONS);
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return HypercubeGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + " (" + dimensions + ")";
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+ return new org.apache.flink.graph.generator.HypercubeGraph(env, dimensions.getValue())
+ .setParallelism(littleParallelism.getValue().intValue())
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
new file mode 100644
index 0000000..968628c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.PathGraph}.
+ */
+public class PathGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+ private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+ .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return PathGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + " (" + vertexCount + ")";
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+ return new org.apache.flink.graph.generator.PathGraph(env, vertexCount.getValue())
+ .setParallelism(littleParallelism.getValue().intValue())
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
new file mode 100644
index 0000000..e4e6a4c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.DoubleParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.Simplify;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Generate an {@code RMatGraph} with {@link IntValue}, {@link LongValue},
+ * or {@link StringValue} keys.
+ *
+ * @see org.apache.flink.graph.generator.RMatGraph
+ *
+ * @param <K> key type
+ */
+public class RMatGraph<K extends Comparable<K>>
+extends ParameterizedBase
+implements Input<K, NullValue, NullValue> {
+
+ private static final String INTEGER = "integer";
+
+ private static final String LONG = "long";
+
+ private static final String STRING = "string";
+
+ private ChoiceParameter type = new ChoiceParameter(this, "type")
+ .setDefaultValue(INTEGER)
+ .addChoices(LONG, STRING);
+
+ // generate graph with 2^scale vertices
+ private LongParameter scale = new LongParameter(this, "scale")
+ .setDefaultValue(10)
+ .setMinimumValue(1);
+
+ // generate graph with edgeFactor * 2^scale edges
+ private LongParameter edgeFactor = new LongParameter(this, "edge_factor")
+ .setDefaultValue(16)
+ .setMinimumValue(1);
+
+ // matrix parameters "a", "b", "c", and implicitly "d = 1 - a - b - c"
+ // describe the skew in the recursive matrix
+ private DoubleParameter a = new DoubleParameter(this, "a")
+ .setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_A)
+ .setMinimumValue(0.0, false);
+
+ private DoubleParameter b = new DoubleParameter(this, "b")
+ .setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_B)
+ .setMinimumValue(0.0, false);
+
+ private DoubleParameter c = new DoubleParameter(this, "c")
+ .setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_C)
+ .setMinimumValue(0.0, false);
+
+ // noise randomly pertubates the matrix parameters for each successive bit
+ // for each generated edge
+ private BooleanParameter noiseEnabled = new BooleanParameter(this, "noise_enabled");
+
+ private DoubleParameter noise = new DoubleParameter(this, "noise")
+ .setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_NOISE)
+ .setMinimumValue(0.0, true)
+ .setMaximumValue(2.0, true);
+
+ private Simplify simplify = new Simplify(this);
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return RMatGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + WordUtils.capitalize(type.getValue()) +
+ " (s" + scale + "e" + edgeFactor + simplify.getShortString() + ")";
+ }
+
+ /**
+ * Generate the graph as configured.
+ *
+ * @param env Flink execution environment
+ * @return input graph
+ */
+ public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception {
+ int lp = littleParallelism.getValue().intValue();
+
+ RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+ long vertexCount = 1L << scale.getValue();
+ long edgeCount = vertexCount * edgeFactor.getValue();
+
+ Graph<LongValue, NullValue, NullValue> rmatGraph = new org.apache.flink.graph.generator.RMatGraph<>(
+ env, rnd, vertexCount, edgeCount)
+ .setConstants(a.getValue().floatValue(), b.getValue().floatValue(), c.getValue().floatValue())
+ .setNoise(noiseEnabled.getValue(), noise.getValue().floatValue())
+ .setParallelism(lp)
+ .generate();
+
+ Graph<K, NullValue, NullValue> graph;
+
+ switch (type.getValue()) {
+ case INTEGER:
+ if (scale.getValue() > 32) {
+ throw new ProgramParametrizationException(
+ "Scale '" + scale.getValue() + "' must be no greater than 32 for type 'integer'");
+ }
+ graph = (Graph<K, NullValue, NullValue>) rmatGraph
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()));
+ break;
+
+ case LONG:
+ if (scale.getValue() > 64) {
+ throw new ProgramParametrizationException(
+ "Scale '" + scale.getValue() + "' must be no greater than 64 for type 'long'");
+ }
+ graph = (Graph<K, NullValue, NullValue>) rmatGraph;
+ break;
+
+ case STRING:
+ // scale bound is same as LONG since keys are generated as LongValue
+ if (scale.getValue() > 64) {
+ throw new ProgramParametrizationException(
+ "Scale '" + scale.getValue() + "' must be no greater than 64 for type 'string'");
+ }
+ graph = (Graph<K, NullValue, NullValue>) rmatGraph
+ .run(new TranslateGraphIds<LongValue, StringValue, NullValue, NullValue>(new LongValueToStringValue()));
+ break;
+
+ default:
+ throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
+ }
+
+ // simplify *after* the translation from LongValue to IntValue or
+ // StringValue to improve the performance of the simplify operators
+ return simplify.simplify(graph);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
new file mode 100644
index 0000000..502826f
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.SingletonEdgeGraph}.
+ */
+public class SingletonEdgeGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+ private LongParameter vertexPairCount = new LongParameter(this, "vertex_pair_count")
+ .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return SingletonEdgeGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + " (" + vertexPairCount + ")";
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+ return new org.apache.flink.graph.generator.SingletonEdgeGraph(env, vertexPairCount.getValue())
+ .setParallelism(littleParallelism.getValue().intValue())
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
new file mode 100644
index 0000000..b794f5c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.StarGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.StarGraph}.
+ */
+public class StarGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+ private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+ .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return StarGraph.class.getSimpleName();
+ }
+
+ @Override
+ public String getIdentity() {
+ return getName() + " (" + vertexCount + ")";
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+ return new org.apache.flink.graph.generator.StarGraph(env, vertexCount.getValue())
+ .setParallelism(littleParallelism.getValue().intValue())
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
index b24f8cf..7b291be 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
@@ -39,7 +39,7 @@ public interface Parameterized {
*
* @return command-line documentation string
*/
- String getParameterization();
+ String getUsage();
/**
* Read parameter values from the command-line arguments.
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
index 3b9b80a..5f36ff5 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
@@ -45,7 +45,7 @@ implements Parameterized {
}
@Override
- public String getParameterization() {
+ public String getUsage() {
StrBuilder strBuilder = new StrBuilder();
// print parameters as ordered list
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
new file mode 100644
index 0000000..3e9fd9a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.Simplify.Ordering;
+import org.apache.flink.types.NullValue;
+
+/**
+ * A simple graph has no self-loops (edges where the source and target vertices
+ * are the same) and no duplicate edges. Flink stores an undirected graph as
+ * a directed graph where each undirected edge is represented by a directed
+ * edge in each direction.
+ *
+ * This {@link Parameter} indicates whether to simplify the graph and if the
+ * graph should be directed or undirected.
+ */
+public class Simplify
+implements Parameter<Ordering> {
+
+ public enum Ordering {
+ // leave the graph unchanged
+ NONE,
+
+ // create a simple, directed graph
+ DIRECTED,
+
+ // create a simple, undirected graph
+ UNDIRECTED,
+
+ // create a simple, undirected graph
+ // remove input edges where source < target before symmetrizing the graph
+ UNDIRECTED_CLIP_AND_FLIP,
+ }
+
+ private Ordering value;
+
+ /**
+ * Add this parameter to the list of parameters stored by owner.
+ *
+ * @param owner the {@link Parameterized} using this {@link Parameter}
+ */
+ public Simplify(ParameterizedBase owner) {
+ owner.addParameter(this);
+ }
+
+ @Override
+ public String getUsage() {
+ return "[--simplify <directed | undirected [--clip_and_flip]>]";
+ }
+
+ @Override
+ public void configure(ParameterTool parameterTool) {
+ String ordering = parameterTool.get("simplify");
+
+ if (ordering == null) {
+ value = Ordering.NONE;
+ } else {
+ switch (ordering.toLowerCase()) {
+ case "directed":
+ value = Ordering.DIRECTED;
+ break;
+ case "undirected":
+ value = parameterTool.has("clip_and_flip") ? Ordering.UNDIRECTED_CLIP_AND_FLIP : Ordering.UNDIRECTED;
+ break;
+ default:
+ throw new ProgramParametrizationException(
+ "Expected 'directed' or 'undirected' ordering but received '" + ordering + "'");
+ }
+ }
+ }
+
+ @Override
+ public Ordering getValue() {
+ return value;
+ }
+
+ /**
+ * Simplify the given graph based on the configured value.
+ *
+ * @param graph input graph
+ * @param <T> graph key type
+ * @return output graph
+ * @throws Exception on error
+ */
+ public <T extends Comparable<T>> Graph<T, NullValue, NullValue> simplify(Graph<T, NullValue, NullValue> graph)
+ throws Exception {
+
+ switch (value) {
+ case DIRECTED:
+ graph = graph
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<T, NullValue, NullValue>());
+ break;
+ case UNDIRECTED:
+ graph = graph
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, NullValue>(false));
+ break;
+ case UNDIRECTED_CLIP_AND_FLIP:
+ graph = graph
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, NullValue>(true));
+ break;
+ }
+
+ return graph;
+ }
+
+ public String getShortString() {
+ switch (value) {
+ case DIRECTED:
+ return "d";
+ case UNDIRECTED:
+ return "u";
+ case UNDIRECTED_CLIP_AND_FLIP:
+ return "\u0254";
+ default:
+ return "";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
new file mode 100644
index 0000000..12ae7dc
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.drivers.parameter.Simplify.Ordering;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SimplifyTest
+extends ParameterTestBase {
+
+ private Simplify parameter;
+
+ @Before
+ public void setup() {
+ super.setup();
+
+ parameter = new Simplify(owner);
+ }
+
+ @Test
+ public void testWithDirected() {
+ parameter.configure(ParameterTool.fromArgs(new String[]{"--simplify", "directed"}));
+ Assert.assertEquals(Ordering.DIRECTED, parameter.getValue());
+ }
+
+ @Test
+ public void testWithUndirected() {
+ parameter.configure(ParameterTool.fromArgs(new String[]{"--simplify", "undirected"}));
+ Assert.assertEquals(Ordering.UNDIRECTED, parameter.getValue());
+ }
+
+ @Test
+ public void testWithNoParameter() {
+ parameter.configure(ParameterTool.fromArgs(new String[]{}));
+ Assert.assertEquals(Ordering.NONE, parameter.getValue());
+ }
+}