You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/01 15:35:59 UTC

[2/2] flink git commit: [FLINK-6382] [gelly] Support additional types for generated graphs in Gelly examples

[FLINK-6382] [gelly] Support additional types for generated graphs in Gelly examples

The Gelly examples current support IntValue, LongValue, and StringValue
for RMatGraph. Allow transformations and tests for all generated graphs
for ByteValue, Byte, ShortValue, Short, CharValue, Character, Integer,
Long, and String.

This closes #3779


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/33695781
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/33695781
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/33695781

Branch: refs/heads/master
Commit: 33695781f9ce2599d18f45de6a465eaefe7d71f4
Parents: d49efbd
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Apr 25 11:36:08 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon May 1 10:35:10 2017 -0400

----------------------------------------------------------------------
 .../flink/graph/drivers/GraphMetrics.java       |   3 +-
 .../flink/graph/drivers/JaccardIndex.java       |   4 +
 .../graph/drivers/input/CompleteGraph.java      |  13 +-
 .../flink/graph/drivers/input/CycleGraph.java   |  13 +-
 .../flink/graph/drivers/input/EmptyGraph.java   |  13 +-
 .../graph/drivers/input/GeneratedGraph.java     | 372 +++++++++++++
 .../drivers/input/GeneratedMultiGraph.java      |  59 +++
 .../flink/graph/drivers/input/GridGraph.java    |  33 +-
 .../graph/drivers/input/HypercubeGraph.java     |  16 +-
 .../flink/graph/drivers/input/PathGraph.java    |  13 +-
 .../flink/graph/drivers/input/RMatGraph.java    |  80 +--
 .../graph/drivers/input/SingletonEdgeGraph.java |  16 +-
 .../flink/graph/drivers/input/StarGraph.java    |  13 +-
 .../drivers/parameter/ChoiceParameter.java      |  23 +-
 .../org/apache/flink/graph/RunnerITCase.java    |   4 +-
 .../flink/graph/drivers/AdamicAdarITCase.java   |  28 +-
 .../drivers/ClusteringCoefficientITCase.java    | 179 +++++--
 .../drivers/ConnectedComponentsITCase.java      | 123 ++++-
 .../drivers/CopyableValueDriverBaseITCase.java  |  52 ++
 .../flink/graph/drivers/DriverBaseITCase.java   |  74 ++-
 .../flink/graph/drivers/EdgeListITCase.java     | 522 ++++++++++++++-----
 .../flink/graph/drivers/GraphMetricsITCase.java | 141 +++--
 .../apache/flink/graph/drivers/HITSITCase.java  |  34 +-
 .../flink/graph/drivers/JaccardIndexITCase.java |  91 +++-
 .../flink/graph/drivers/PageRankITCase.java     |  34 +-
 .../graph/drivers/TriangleListingITCase.java    | 276 ++++++++--
 .../graph/drivers/input/GeneratedGraphTest.java | 203 ++++++++
 .../translators/LongValueToSignedIntValue.java  |   2 +
 .../LongValueToUnsignedIntValue.java            |   6 +-
 .../flink/graph/generator/CompleteGraph.java    |   7 +-
 .../apache/flink/graph/generator/GridGraph.java |   2 +-
 .../apache/flink/graph/generator/RMatGraph.java |   2 +-
 .../apache/flink/graph/generator/StarGraph.java |   2 +-
 .../flink/graph/library/link_analysis/HITS.java |   2 -
 .../graph/library/similarity/JaccardIndex.java  |  50 +-
 .../org/apache/flink/graph/asm/AsmTestBase.java |   2 +-
 .../LongValueToSignedIntValueTest.java          |   4 +-
 .../LongValueToUnsignedIntValueTest.java        |   6 +-
 38 files changed, 2054 insertions(+), 463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index cc5a894..aef8f9f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.drivers.output.Hash;
 import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
 import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
-import org.apache.flink.types.CopyableValue;
 
 /**
  * Driver for directed and undirected graph metrics analytics.
@@ -36,7 +35,7 @@ import org.apache.flink.types.CopyableValue;
  * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
  * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
  */
-public class GraphMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+public class GraphMetrics<K extends Comparable<K>, VV, EV>
 extends ParameterizedBase
 implements Driver<K, VV, EV>, Hash, Print {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index ae0d5f8..d5b2ae3 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.output.CSV;
 import org.apache.flink.graph.drivers.output.Hash;
 import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.types.CopyableValue;
@@ -57,6 +58,8 @@ implements CSV, Hash, Print {
 	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
+	private BooleanParameter mirrorResults = new BooleanParameter(this, "mirror_results");
+
 	@Override
 	public String getName() {
 		return this.getClass().getSimpleName();
@@ -88,6 +91,7 @@ implements CSV, Hash, Print {
 			.run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
 				.setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue())
 				.setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue())
+				.setMirrorResults(mirrorResults.getValue())
 				.setLittleParallelism(lp));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
index c31c5a8..dc85df4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.drivers.input;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -32,8 +31,7 @@ import static org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUN
  * Generate a {@link org.apache.flink.graph.generator.CompleteGraph}.
  */
 public class CompleteGraph
-extends ParameterizedBase
-implements Input<LongValue, NullValue, NullValue> {
+extends GeneratedGraph<LongValue> {
 
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
@@ -48,11 +46,16 @@ implements Input<LongValue, NullValue, NullValue> {
 
 	@Override
 	public String getIdentity() {
-		return getName() + " (" + vertexCount.getValue() + ")";
+		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ")";
 	}
 
 	@Override
-	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
 		return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue())
 			.setParallelism(littleParallelism.getValue().intValue())
 			.generate();

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
index df66dab..9ef67c3 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.drivers.input;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -32,8 +31,7 @@ import static org.apache.flink.graph.generator.CycleGraph.MINIMUM_VERTEX_COUNT;
  * Generate a {@link org.apache.flink.graph.generator.CycleGraph}.
  */
 public class CycleGraph
-extends ParameterizedBase
-implements Input<LongValue, NullValue, NullValue> {
+extends GeneratedGraph<LongValue> {
 
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
@@ -48,11 +46,16 @@ implements Input<LongValue, NullValue, NullValue> {
 
 	@Override
 	public String getIdentity() {
-		return getName() + " (" + vertexCount + ")";
+		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
 	}
 
 	@Override
-	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue())
 			.setParallelism(littleParallelism.getValue().intValue())
 			.generate();

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
index 794317b..e7b5942 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.drivers.input;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -31,8 +30,7 @@ import static org.apache.flink.graph.generator.EmptyGraph.MINIMUM_VERTEX_COUNT;
  * Generate an {@link org.apache.flink.graph.generator.EmptyGraph}.
  */
 public class EmptyGraph
-extends ParameterizedBase
-implements Input<LongValue, NullValue, NullValue> {
+extends GeneratedGraph<LongValue> {
 
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
@@ -44,11 +42,16 @@ implements Input<LongValue, NullValue, NullValue> {
 
 	@Override
 	public String getIdentity() {
-		return getName() + " (" + vertexCount + ")";
+		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
 	}
 
 	@Override
-	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue())
 			.generate();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
new file mode 100644
index 0000000..610722c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.ShortValue;
+
+/**
+ * Base class for generated graphs.
+ *
+ * @param <K> graph ID type
+ */
+public abstract class GeneratedGraph<K>
+extends ParameterizedBase
+implements Input<K, NullValue, NullValue> {
+
+	private static final String BYTE = "byte";
+	private static final String NATIVE_BYTE = "nativeByte";
+
+	private static final String SHORT = "short";
+	private static final String NATIVE_SHORT = "nativeShort";
+
+	private static final String CHAR = "char";
+	private static final String NATIVE_CHAR = "nativeChar";
+
+	private static final String INTEGER = "integer";
+	private static final String NATIVE_INTEGER = "nativeInteger";
+
+	private static final String LONG = "long";
+	private static final String NATIVE_LONG = "nativeLong";
+
+	private static final String STRING = "string";
+	private static final String NATIVE_STRING = "nativeString";
+
+	private ChoiceParameter type = new ChoiceParameter(this, "type")
+		.setDefaultValue(INTEGER)
+		.addChoices(LONG, STRING)
+		.addHiddenChoices(BYTE, NATIVE_BYTE, SHORT, NATIVE_SHORT, CHAR, NATIVE_CHAR, NATIVE_INTEGER, NATIVE_LONG, NATIVE_STRING);
+
+	/**
+	 * The vertex count is verified to be no greater than the capacity of the
+	 * selected data type. All vertices must be counted even if skipped or
+	 * unused when generating graph edges.
+	 *
+	 * @return number of vertices configured for the graph
+	 */
+	protected abstract long vertexCount();
+
+	/**
+	 * Generate the graph as configured.
+	 *
+	 * @param env Flink execution environment
+	 * @return generated graph
+	 * @throws Exception on error
+	 */
+	protected abstract Graph<K, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception;
+
+	/**
+	 * Get the name of the type.
+	 *
+	 * @return name of the type
+	 */
+	protected String getTypeName() {
+		return WordUtils.capitalize(type.getValue());
+	}
+
+	@Override
+	public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env)
+			throws Exception {
+		long maxVertexCount = Long.MAX_VALUE;
+		TranslateFunction translator = null;
+
+		switch (type.getValue()) {
+			case BYTE:
+				maxVertexCount = LongValueToUnsignedByteValue.MAX_VERTEX_COUNT;
+				translator = new LongValueToUnsignedByteValue();
+				break;
+
+			case NATIVE_BYTE:
+				maxVertexCount = LongValueToUnsignedByte.MAX_VERTEX_COUNT;
+				translator = new LongValueToUnsignedByte();
+				break;
+
+			case SHORT:
+				maxVertexCount = LongValueToUnsignedShortValue.MAX_VERTEX_COUNT;
+				translator = new LongValueToUnsignedShortValue();
+				break;
+
+			case NATIVE_SHORT:
+				maxVertexCount = LongValueToUnsignedShort.MAX_VERTEX_COUNT;
+				translator = new LongValueToUnsignedShort();
+				break;
+
+			case CHAR:
+				maxVertexCount = LongValueToCharValue.MAX_VERTEX_COUNT;
+				translator = new LongValueToCharValue();
+				break;
+
+			case NATIVE_CHAR:
+				maxVertexCount = LongValueToChar.MAX_VERTEX_COUNT;
+				translator = new LongValueToChar();
+				break;
+
+			case INTEGER:
+				maxVertexCount = LongValueToUnsignedIntValue.MAX_VERTEX_COUNT;
+				translator = new LongValueToUnsignedIntValue();
+				break;
+
+			case NATIVE_INTEGER:
+				maxVertexCount = LongValueToUnsignedInt.MAX_VERTEX_COUNT;
+				translator = new LongValueToUnsignedInt();
+				break;
+
+			case LONG:
+				break;
+
+			case NATIVE_LONG:
+				translator = new LongValueToLong();
+				break;
+
+			case STRING:
+				translator = new LongValueToStringValue();
+				break;
+
+			case NATIVE_STRING:
+				translator = new LongValueToString();
+				break;
+
+			default:
+				throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
+		}
+
+		long vertexCount = vertexCount();
+		if (vertexCount > maxVertexCount) {
+			throw new ProgramParametrizationException("Vertex count '" + vertexCount +
+				"' must be no greater than " + maxVertexCount +  " for type '" + type.getValue() + "'.");
+		}
+
+		Graph<K, NullValue, NullValue> graph = generate(env);
+
+		if (translator != null) {
+			graph = (Graph<K, NullValue, NullValue>) graph.run(new TranslateGraphIds(translator));
+		}
+
+		return graph;
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@link ByteValue}.
+	 *
+	 * Throws {@link RuntimeException} for byte overflow.
+	 */
+	static class LongValueToUnsignedByteValue
+	implements TranslateFunction<LongValue, ByteValue> {
+		public static final long MAX_VERTEX_COUNT = 1L << 8;
+
+		@Override
+		public ByteValue translate(LongValue value, ByteValue reuse)
+				throws Exception {
+			if (reuse == null) {
+				reuse = new ByteValue();
+			}
+
+			long l = value.getValue();
+
+			if (l < 0 || l >= MAX_VERTEX_COUNT) {
+				throw new IllegalArgumentException("Cannot cast long value " + value + " to byte.");
+			} else {
+				reuse.setValue((byte) (l & (MAX_VERTEX_COUNT - 1)));
+			}
+
+			return reuse;
+		}
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@link Byte}.
+	 *
+	 * Throws {@link RuntimeException} for byte overflow.
+	 */
+	static class LongValueToUnsignedByte
+	implements TranslateFunction<LongValue, Byte> {
+		public static final long MAX_VERTEX_COUNT = 1L << 8;
+
+		@Override
+		public Byte translate(LongValue value, Byte reuse)
+				throws Exception {
+			long l = value.getValue();
+
+			if (l < 0 || l >= MAX_VERTEX_COUNT) {
+				throw new IllegalArgumentException("Cannot cast long value " + value + " to byte.");
+			}
+
+			return (byte) (l & (MAX_VERTEX_COUNT - 1));
+		}
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@link ShortValue}.
+	 *
+	 * Throws {@link RuntimeException} for short overflow.
+	 */
+	static class LongValueToUnsignedShortValue
+	implements TranslateFunction<LongValue, ShortValue> {
+		public static final long MAX_VERTEX_COUNT = 1L << 16;
+
+		@Override
+		public ShortValue translate(LongValue value, ShortValue reuse)
+				throws Exception {
+			if (reuse == null) {
+				reuse = new ShortValue();
+			}
+
+			long l = value.getValue();
+
+			if (l < 0 || l >= MAX_VERTEX_COUNT) {
+				throw new IllegalArgumentException("Cannot cast long value " + value + " to short.");
+			} else {
+				reuse.setValue((short) (l & (MAX_VERTEX_COUNT - 1)));
+			}
+
+			return reuse;
+		}
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@link Short}.
+	 *
+	 * Throws {@link RuntimeException} for short overflow.
+	 */
+	static class LongValueToUnsignedShort
+	implements TranslateFunction<LongValue, Short> {
+		public static final long MAX_VERTEX_COUNT = 1L << 16;
+
+		@Override
+		public Short translate(LongValue value, Short reuse)
+				throws Exception {
+			long l = value.getValue();
+
+			if (l < 0 || l >= MAX_VERTEX_COUNT) {
+				throw new IllegalArgumentException("Cannot cast long value " + value + " to short.");
+			}
+
+			return (short) (l & (MAX_VERTEX_COUNT - 1));
+		}
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@link CharValue}.
+	 *
+	 * Throws {@link RuntimeException} for char overflow.
+	 */
+	static class LongValueToCharValue
+	implements TranslateFunction<LongValue, CharValue> {
+		public static final long MAX_VERTEX_COUNT = 1L << 16;
+
+		@Override
+		public CharValue translate(LongValue value, CharValue reuse)
+				throws Exception {
+			if (reuse == null) {
+				reuse = new CharValue();
+			}
+
+			long l = value.getValue();
+
+			if (l < 0 || l >= MAX_VERTEX_COUNT) {
+				throw new IllegalArgumentException("Cannot cast long value " + value + " to char.");
+			} else {
+				reuse.setValue((char) (l & (MAX_VERTEX_COUNT - 1)));
+			}
+
+			return reuse;
+		}
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@code Character}.
+	 *
+	 * Throws {@link RuntimeException} for char overflow.
+	 */
+	static class LongValueToChar
+	implements TranslateFunction<LongValue, Character> {
+		public static final long MAX_VERTEX_COUNT = 1L << 16;
+
+		@Override
+		public Character translate(LongValue value, Character reuse)
+				throws Exception {
+			long l = value.getValue();
+
+			if (l < 0 || l >= MAX_VERTEX_COUNT) {
+				throw new IllegalArgumentException("Cannot cast long value " + value + " to char.");
+			}
+
+			return (char) (l & (MAX_VERTEX_COUNT - 1));
+		}
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@link Integer}.
+	 *
+	 * Throws {@link RuntimeException} for integer overflow.
+	 */
+	static class LongValueToUnsignedInt
+	implements TranslateFunction<LongValue, Integer> {
+		public static final long MAX_VERTEX_COUNT = 1L << 32;
+
+		@Override
+		public Integer translate(LongValue value, Integer reuse)
+				throws Exception {
+			long l = value.getValue();
+
+			if (l < 0 || l >= MAX_VERTEX_COUNT) {
+				throw new IllegalArgumentException("Cannot cast long value " + value + " to integer.");
+			}
+
+			return (int) (l & (MAX_VERTEX_COUNT - 1));
+		}
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@link Long}.
+	 */
+	static class LongValueToLong
+	implements TranslateFunction<LongValue, Long> {
+		@Override
+		public Long translate(LongValue value, Long reuse)
+				throws Exception {
+			return value.getValue();
+		}
+	}
+
+	/**
+	 * Translate {@link LongValue} to {@link String}.
+	 */
+	static class LongValueToString
+	implements TranslateFunction<LongValue, String> {
+		@Override
+		public String translate(LongValue value, String reuse)
+				throws Exception {
+			return Long.toString(value.getValue());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
new file mode 100644
index 0000000..c0a16a8
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.Simplify;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Base class for graph generators which may create duplicate edges.
+ *
+ * @param <K> graph ID type
+ */
+public abstract class GeneratedMultiGraph<K extends Comparable<K>>
+extends GeneratedGraph<K> {
+
+	private Simplify simplify = new Simplify(this);
+
+	/**
+	 * Get the short string representation of the simplify transformation.
+	 *
+	 * @return short string representation of the simplify transformation
+	 */
+	protected String getSimplifyShortString() {
+		return simplify.getShortString();
+	}
+
+	/**
+	 * Generate the graph as configured.
+	 *
+	 * @param env Flink execution environment
+	 * @return input graph
+	 */
+	public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env)
+			throws Exception {
+		Graph<K, NullValue, NullValue> graph = super.create(env);
+
+		// simplify after the translation to improve the performance of the
+		// simplify operators by processing smaller data types
+		return simplify.simplify(graph);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index d502215..b41b86e 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -38,8 +38,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * Generate a {@link org.apache.flink.graph.generator.GridGraph}.
  */
 public class GridGraph
-extends ParameterizedBase
-implements Input<LongValue, NullValue, NullValue> {
+extends GeneratedGraph<LongValue> {
 
 	private static final String PREFIX = "dim";
 
@@ -67,10 +66,10 @@ implements Input<LongValue, NullValue, NullValue> {
 		Map<Integer, String> dimensionMap = new TreeMap<>();
 
 		// first parse all dimensions into a sorted map
-		for (Map.Entry<String, String> entry : parameterTool.toMap().entrySet()) {
-			if (entry.getKey().startsWith(PREFIX)) {
-				int dimensionId = Integer.parseInt(entry.getKey().substring(PREFIX.length()));
-				dimensionMap.put(dimensionId, entry.getValue());
+		for (String key : parameterTool.toMap().keySet()) {
+			if (key.startsWith(PREFIX)) {
+				int dimensionId = Integer.parseInt(key.substring(PREFIX.length()));
+				dimensionMap.put(dimensionId, parameterTool.get(key));
 			}
 		}
 
@@ -82,11 +81,27 @@ implements Input<LongValue, NullValue, NullValue> {
 
 	@Override
 	public String getIdentity() {
-		return getName() + " (" + dimensions + ")";
+		return getTypeName() + " " + getName() + " (" + dimensions + ")";
 	}
 
 	@Override
-	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+	protected long vertexCount() {
+		// in Java 8 use Math.multiplyExact(long, long)
+		BigInteger vertexCount = BigInteger.ONE;
+		for (Dimension dimension : dimensions) {
+			vertexCount = vertexCount.multiply(BigInteger.valueOf(dimension.size));
+		}
+
+		if (vertexCount.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
+			throw new ProgramParametrizationException("Number of vertices in grid graph '" + vertexCount +
+				"' is greater than Long.MAX_VALUE.");
+		}
+
+		return vertexCount.longValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		org.apache.flink.graph.generator.GridGraph graph = new org.apache.flink.graph.generator.GridGraph(env);
 
 		for (Dimension dimension : dimensions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
index 3b87b00..8d1e8b1 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.drivers.input;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -32,11 +31,11 @@ import static org.apache.flink.graph.generator.HypercubeGraph.MINIMUM_DIMENSIONS
  * Generate a {@link org.apache.flink.graph.generator.HypercubeGraph}.
  */
 public class HypercubeGraph
-extends ParameterizedBase
-implements Input<LongValue, NullValue, NullValue> {
+extends GeneratedGraph<LongValue> {
 
 	private LongParameter dimensions = new LongParameter(this, "dimensions")
-		.setMinimumValue(MINIMUM_DIMENSIONS);
+		.setMinimumValue(MINIMUM_DIMENSIONS)
+		.setMaximumValue(63);
 
 	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
 		.setDefaultValue(PARALLELISM_DEFAULT);
@@ -48,11 +47,16 @@ implements Input<LongValue, NullValue, NullValue> {
 
 	@Override
 	public String getIdentity() {
-		return getName() + " (" + dimensions + ")";
+		return getTypeName() + " " + getName() + " (" + dimensions + ")";
 	}
 
 	@Override
-	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+	protected long vertexCount() {
+		return 1L << dimensions.getValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.HypercubeGraph(env, dimensions.getValue())
 			.setParallelism(littleParallelism.getValue().intValue())
 			.generate();

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
index 968628c..9e02056 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.drivers.input;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -32,8 +31,7 @@ import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
  * Generate a {@link org.apache.flink.graph.generator.PathGraph}.
  */
 public class PathGraph
-extends ParameterizedBase
-implements Input<LongValue, NullValue, NullValue> {
+extends GeneratedGraph<LongValue> {
 
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
@@ -48,11 +46,16 @@ implements Input<LongValue, NullValue, NullValue> {
 
 	@Override
 	public String getIdentity() {
-		return getName() + " (" + vertexCount + ")";
+		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
 	}
 
 	@Override
-	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.PathGraph(env, vertexCount.getValue())
 			.setParallelism(littleParallelism.getValue().intValue())
 			.generate();

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
index d64534b..adee1eb 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -18,20 +18,12 @@
 
 package org.apache.flink.graph.drivers.input;
 
-import org.apache.commons.lang3.text.WordUtils;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.drivers.parameter.BooleanParameter;
-import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
 import org.apache.flink.graph.drivers.parameter.DoubleParameter;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
-import org.apache.flink.graph.drivers.parameter.Simplify;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
 import org.apache.flink.types.IntValue;
@@ -46,22 +38,9 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * or {@link StringValue} keys.
  *
  * @see org.apache.flink.graph.generator.RMatGraph
- *
- * @param <K> key type
  */
-public class RMatGraph<K extends Comparable<K>>
-extends ParameterizedBase
-implements Input<K, NullValue, NullValue> {
-
-	private static final String INTEGER = "integer";
-
-	private static final String LONG = "long";
-
-	private static final String STRING = "string";
-
-	private ChoiceParameter type = new ChoiceParameter(this, "type")
-		.setDefaultValue(INTEGER)
-		.addChoices(LONG, STRING);
+public class RMatGraph
+extends GeneratedMultiGraph<LongValue> {
 
 	// generate graph with 2^scale vertices
 	private LongParameter scale = new LongParameter(this, "scale")
@@ -99,20 +78,23 @@ implements Input<K, NullValue, NullValue> {
 	private LongParameter seed = new LongParameter(this, "seed")
 		.setDefaultValue(JDKRandomGeneratorFactory.DEFAULT_SEED);
 
-	private Simplify simplify = new Simplify(this);
-
 	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
 	public String getName() {
-		return RMatGraph.class.getSimpleName();
+		return this.getClass().getSimpleName();
 	}
 
 	@Override
 	public String getIdentity() {
-		return getName() + WordUtils.capitalize(type.getValue()) +
-			" (s" + scale + "e" + edgeFactor + simplify.getShortString() + ")";
+		return getTypeName() + " " + getName() +
+			" (s" + scale + "e" + edgeFactor + getSimplifyShortString() + ")";
+	}
+
+	@Override
+	protected long vertexCount() {
+		return 1L << scale.getValue();
 	}
 
 	/**
@@ -121,7 +103,7 @@ implements Input<K, NullValue, NullValue> {
 	 * @param env Flink execution environment
 	 * @return input graph
 	 */
-	public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception {
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
 		int lp = littleParallelism.getValue().intValue();
 
 		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
@@ -129,49 +111,11 @@ implements Input<K, NullValue, NullValue> {
 		long vertexCount = 1L << scale.getValue();
 		long edgeCount = vertexCount * edgeFactor.getValue();
 
-		Graph<LongValue, NullValue, NullValue> rmatGraph = new org.apache.flink.graph.generator.RMatGraph<>(
+		return new org.apache.flink.graph.generator.RMatGraph<>(
 				env, rnd, vertexCount, edgeCount)
 			.setConstants(a.getValue().floatValue(), b.getValue().floatValue(), c.getValue().floatValue())
 			.setNoise(noiseEnabled.getValue(), noise.getValue().floatValue())
 			.setParallelism(lp)
 			.generate();
-
-		Graph<K, NullValue, NullValue> graph;
-
-		switch (type.getValue()) {
-			case INTEGER:
-				if (scale.getValue() > 32) {
-					throw new ProgramParametrizationException(
-						"Scale '" + scale.getValue() + "' must be no greater than 32 for type 'integer'");
-				}
-				graph = (Graph<K, NullValue, NullValue>) rmatGraph
-					.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()));
-				break;
-
-			case LONG:
-				if (scale.getValue() > 64) {
-					throw new ProgramParametrizationException(
-						"Scale '" + scale.getValue() + "' must be no greater than 64 for type 'long'");
-				}
-				graph = (Graph<K, NullValue, NullValue>) rmatGraph;
-				break;
-
-			case STRING:
-				// scale bound is same as LONG since keys are generated as LongValue
-				if (scale.getValue() > 64) {
-					throw new ProgramParametrizationException(
-						"Scale '" + scale.getValue() + "' must be no greater than 64 for type 'string'");
-				}
-				graph = (Graph<K, NullValue, NullValue>) rmatGraph
-					.run(new TranslateGraphIds<LongValue, StringValue, NullValue, NullValue>(new LongValueToStringValue()));
-				break;
-
-			default:
-				throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
-		}
-
-		// simplify *after* the translation from LongValue to IntValue or
-		// StringValue to improve the performance of the simplify operators
-		return simplify.simplify(graph);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
index 502826f..65e0196 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.drivers.input;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -32,11 +31,11 @@ import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
  * Generate a {@link org.apache.flink.graph.generator.SingletonEdgeGraph}.
  */
 public class SingletonEdgeGraph
-extends ParameterizedBase
-implements Input<LongValue, NullValue, NullValue> {
+extends GeneratedGraph<LongValue> {
 
 	private LongParameter vertexPairCount = new LongParameter(this, "vertex_pair_count")
-		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+		.setMinimumValue(MINIMUM_VERTEX_COUNT)
+		.setMaximumValue(1L << 62);
 
 	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
 		.setDefaultValue(PARALLELISM_DEFAULT);
@@ -48,11 +47,16 @@ implements Input<LongValue, NullValue, NullValue> {
 
 	@Override
 	public String getIdentity() {
-		return getName() + " (" + vertexPairCount + ")";
+		return getTypeName() + " " + getName() + " (" + vertexPairCount + ")";
 	}
 
 	@Override
-	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+	protected long vertexCount() {
+		return 2 * vertexPairCount.getValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.SingletonEdgeGraph(env, vertexPairCount.getValue())
 			.setParallelism(littleParallelism.getValue().intValue())
 			.generate();

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
index b794f5c..b37dc49 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.drivers.input;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -32,8 +31,7 @@ import static org.apache.flink.graph.generator.StarGraph.MINIMUM_VERTEX_COUNT;
  * Generate a {@link org.apache.flink.graph.generator.StarGraph}.
  */
 public class StarGraph
-extends ParameterizedBase
-implements Input<LongValue, NullValue, NullValue> {
+extends GeneratedGraph<LongValue> {
 
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
@@ -48,11 +46,16 @@ implements Input<LongValue, NullValue, NullValue> {
 
 	@Override
 	public String getIdentity() {
-		return getName() + " (" + vertexCount + ")";
+		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
 	}
 
 	@Override
-	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.StarGraph(env, vertexCount.getValue())
 			.setParallelism(littleParallelism.getValue().intValue())
 			.generate();

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
index b239b93..f1b716d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.graph.drivers.parameter;
 
-import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.util.Preconditions;
@@ -37,6 +37,8 @@ extends SimpleParameter<String> {
 
 	private List<String> choices = new ArrayList<>();
 
+	private List<String> hiddenChoices = new ArrayList<>();
+
 	/**
 	 * Set the parameter name and add this parameter to the list of parameters
 	 * stored by owner.
@@ -71,6 +73,18 @@ extends SimpleParameter<String> {
 		return this;
 	}
 
+	/**
+	 * Add additional hidden choices. This function can be called multiple
+	 * times. These choices will not be printed in the usage string.
+	 *
+	 * @param hiddenChoices additional hidden choices
+	 * @return this
+	 */
+	public ChoiceParameter addHiddenChoices(String... hiddenChoices) {
+		Collections.addAll(this.hiddenChoices, hiddenChoices);
+		return this;
+	}
+
 	@Override
 	public String getUsage() {
 		String option = new StrBuilder()
@@ -107,6 +121,13 @@ extends SimpleParameter<String> {
 			}
 		}
 
+		for (String choice : hiddenChoices) {
+			if (choice.equals(selected)) {
+				this.value = selected;
+				return;
+			}
+		}
+
 		throw new ProgramParametrizationException(
 			"Selection '" + selected + "' for option '" + name + "' is not in choices '[" + StringUtils.join(choices, ", ") + "]'");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
index a48fdf1..f93dc31 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
@@ -33,8 +33,8 @@ extends DriverBaseITCase {
 	@Rule
 	public ExpectedException thrown = ExpectedException.none();
 
-	public RunnerITCase(TestExecutionMode mode) {
-		super(mode);
+	public RunnerITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
index 400c241..2548263 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
@@ -18,17 +18,28 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class AdamicAdarITCase
-extends DriverBaseITCase {
+extends CopyableValueDriverBaseITCase {
 
-	public AdamicAdarITCase(TestExecutionMode mode) {
-		super(mode);
+	public AdamicAdarITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(int scale, String output, String... additionalParameters) {
+		String[] parameters = new String[] {
+			"--algorithm", "AdamicAdar",
+			"--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", "undirected",
+			"--output", output};
+
+		return ArrayUtils.addAll(parameters, additionalParameters);
 	}
 
 	@Test
@@ -42,11 +53,12 @@ extends DriverBaseITCase {
 	}
 
 	@Test
-	public void testPrintWithRMatIntegerGraph() throws Exception {
+	public void testPrintWithRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
 		expectedCount(
-			new String[]{"--algorithm", "AdamicAdar",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output", "print"},
-			221628);
+			parameters(7, "print"),
+			5694);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
index f215b91..86eee01 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
@@ -19,16 +19,25 @@
 package org.apache.flink.graph.drivers;
 
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class ClusteringCoefficientITCase
-extends DriverBaseITCase {
+extends CopyableValueDriverBaseITCase {
 
-	public ClusteringCoefficientITCase(TestExecutionMode mode) {
-		super(mode);
+	public ClusteringCoefficientITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(int scale, String order, String simplify, String output) {
+		return new String[] {
+			"--algorithm", "ClusteringCoefficient", "--order", order,
+			"--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", simplify,
+			"--output", output};
 	}
 
 	@Test
@@ -42,48 +51,148 @@ extends DriverBaseITCase {
 	}
 
 	@Test
-	public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+	public void testHashWithSmallDirectedRMatGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "short":
+			case "char":
+			case "integer":
+				checksum = 0x0000003621c62ca1L;
+				break;
+
+			case "long":
+				checksum = 0x0000003b74c6719bL;
+				break;
+
+			case "string":
+				checksum = 0x0000003ab67abea8L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
 		String expected = "\n" +
-			"ChecksumHashCode 0x000001c0409df6c0, count 902\n" +
-			"triplet count: 1003442, triangle count: 225147, global clustering coefficient: 0.22437470[0-9]+\n" +
-			"vertex count: 902, average clustering coefficient: 0.32943748[0-9]+\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "ClusteringCoefficient", "--order", "directed",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
-				"--output", "hash"},
-			expected);
+			new Checksum(117, checksum) + "\n" +
+			"triplet count: 29286, triangle count: 11466, global clustering coefficient: 0.39151813[0-9]+\n" +
+			"vertex count: 117, average clustering coefficient: 0.45125697[0-9]+\n";
+
+		expectedOutput(parameters(7, "directed", "directed", "hash"), expected);
 	}
 
 	@Test
-	public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
-		expectedCount(
-			new String[]{"--algorithm", "ClusteringCoefficient", "--order", "directed",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
-				"--output", "print"},
-			904);
+	public void testHashWithSmallUndirectedRMatGraph() throws Exception {
+		long directed_checksum;
+		long undirected_checksum;
+		switch (idType) {
+			case "byte":
+			case "short":
+			case "char":
+			case "integer":
+				directed_checksum = 0x0000003875b38c43L;
+				undirected_checksum = 0x0000003c20344c75L;
+				break;
+
+			case "long":
+				directed_checksum = 0x0000003671970c59L;
+				undirected_checksum = 0x0000003939645d8cL;
+				break;
+
+			case "string":
+				directed_checksum = 0x0000003be109a770L;
+				undirected_checksum = 0x0000003b8c98d14aL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		String expected = "\n" +
+			"triplet count: 29286, triangle count: 11466, global clustering coefficient: 0.39151813[0-9]+\n" +
+			"vertex count: 117, average clustering coefficient: 0.57438679[0-9]+\n";
+
+		expectedOutput(parameters(7, "directed", "undirected", "hash"),
+			"\n" + new Checksum(117, directed_checksum) + expected);
+		expectedOutput(parameters(7, "undirected", "undirected", "hash"),
+			"\n" + new Checksum(117, undirected_checksum) + expected);
 	}
 
 	@Test
-	public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+	public void testHashWithLargeDirectedRMatGraph() throws Exception {
+		// computation is too large for collection mode
+		Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+		long checksum;
+		switch (idType) {
+			case "byte":
+				return;
+
+			case "short":
+			case "char":
+			case "integer":
+				checksum = 0x0000067a9d18e7f3L;
+				break;
+
+			case "long":
+				checksum = 0x00000694a90ee6d4L;
+				break;
+
+			case "string":
+				checksum = 0x000006893e3b314fL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
 		String expected = "\n" +
-			"ChecksumHashCode 0x000001ccf8c45fdb, count 902\n" +
-			"triplet count: 1003442, triangle count: 225147, global clustering coefficient: 0.22437470[0-9]+\n" +
-			"vertex count: 902, average clustering coefficient: 0.42173070[0-9]+\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "ClusteringCoefficient", "--order", "undirected",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output", "hash"},
-			expected);
+			new Checksum(3349, checksum) + "\n" +
+			"triplet count: 9276207, triangle count: 1439454, global clustering coefficient: 0.15517700[0-9]+\n" +
+			"vertex count: 3349, average clustering coefficient: 0.24571815[0-9]+\n";
+
+		expectedOutput(parameters(12, "directed", "directed", "hash"), expected);
 	}
 
 	@Test
-	public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
-		expectedCount(
-			new String[]{"--algorithm", "ClusteringCoefficient", "--order", "undirected",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output", "print"},
-			904);
+	public void testHashWithLargeUndirectedRMatGraph() throws Exception {
+		// computation is too large for collection mode
+		Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+		long directed_checksum;
+		long undirected_checksum;
+		switch (idType) {
+			case "byte":
+				return;
+
+			case "short":
+			case "char":
+			case "integer":
+				directed_checksum = 0x00000681fad1587eL;
+				undirected_checksum = 0x0000068713b3b7f1L;
+				break;
+
+			case "long":
+				directed_checksum = 0x000006928a6301b1L;
+				undirected_checksum = 0x000006a399edf0e6L;
+				break;
+
+			case "string":
+				directed_checksum = 0x000006749670a2f7L;
+				undirected_checksum = 0x0000067f19c6c4d5L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		String expected = "\n" +
+			"triplet count: 9276207, triangle count: 1439454, global clustering coefficient: 0.15517700[0-9]+\n" +
+			"vertex count: 3349, average clustering coefficient: 0.33029442[0-9]+\n";
+
+		expectedOutput(parameters(12, "directed", "undirected", "hash"),
+			"\n" + new Checksum(3349, directed_checksum) + expected);
+		expectedOutput(parameters(12, "undirected", "undirected", "hash"),
+			"\n" + new Checksum(3349, undirected_checksum) + expected);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
index b91abb3..95f0c66 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
@@ -19,6 +19,8 @@
 package org.apache.flink.graph.drivers;
 
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -27,8 +29,16 @@ import org.junit.runners.Parameterized;
 public class ConnectedComponentsITCase
 extends DriverBaseITCase {
 
-	public ConnectedComponentsITCase(TestExecutionMode mode) {
-		super(mode);
+	public ConnectedComponentsITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(int scale, String output) {
+		return new String[] {
+			"--algorithm", "ConnectedComponents",
+			"--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", "undirected",
+				"--edge_factor", "1", "--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0",
+			"--output", output};
 	}
 
 	@Test
@@ -42,24 +52,101 @@ extends DriverBaseITCase {
 	}
 
 	@Test
-	public void testHashWithRMatIntegerGraph() throws Exception {
-		String expected = "\\nChecksumHashCode 0x0000000000cdc7e7, count 838\\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "ConnectedComponents",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", "--edge_factor", "1",
-					"--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0",
-				"--output", "hash"},
-			expected);
+	public void testHashWithSmallRMatGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x0000000000033e88L;
+				break;
+
+			case "long":
+				checksum = 0x0000000000057848L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x000000000254a4c3L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(parameters(7, "hash"), 106, checksum);
 	}
 
 	@Test
-	public void testPrintWithRMatIntegerGraph() throws Exception {
-		expectedCount(
-			new String[]{"--algorithm", "ConnectedComponents",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", "--edge_factor", "1",
-					"--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0",
-				"--output", "print"},
-			838);
+	public void testHashWithLargeRMatGraph() throws Exception {
+		// computation is too large for collection mode
+		Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+				return;
+
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x00000003094ffba2L;
+				break;
+
+			case "long":
+				checksum = 0x000000030b68e522L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x00001839ad14edb1L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(parameters(15, "hash"), 25572, checksum);
+	}
+
+	@Test
+	public void testPrintWithSmallRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "integer":
+			case "nativeInteger":
+			case "long":
+			case "nativeLong":
+				checksum = 0x00000024edd0568dL;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x000000232d8bf58dL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedOutputChecksum(parameters(7, "print"), new Checksum(106, checksum));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/CopyableValueDriverBaseITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/CopyableValueDriverBaseITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/CopyableValueDriverBaseITCase.java
new file mode 100644
index 0000000..bc09820
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/CopyableValueDriverBaseITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Base class for drivers requiring the key ID to implement
+ * {@code CopyableValue}. This class overrides {@link DriverBaseITCase}
+ * to restrict the tested ID types.
+ */
+public abstract class CopyableValueDriverBaseITCase
+extends DriverBaseITCase {
+
+	protected CopyableValueDriverBaseITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	// limit tests to types supporting CopyableValue
+	@Parameterized.Parameters(name = "ID type = {0}, Execution mode = {1}")
+	public static Collection<Object[]> executionModes() {
+		List<Object[]> executionModes = new ArrayList<>();
+
+		for (String idType : new String[] {"byte", "short", "char", "integer", "long", "string"}) {
+			for (TestExecutionMode executionMode : TestExecutionMode.values()) {
+				executionModes.add(new Object[] {idType, executionMode});
+			}
+		}
+
+		return executionModes;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
index d19ca97..5b0e42e 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.flink.graph.Runner;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.hamcrest.Description;
@@ -31,27 +32,55 @@ import org.junit.runners.Parameterized;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.regex.Pattern;
 
+/**
+ *
+ */
 public abstract class DriverBaseITCase
 extends MultipleProgramsTestBase {
 
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
-	protected DriverBaseITCase(TestExecutionMode mode) {
+	protected final String idType;
+
+	protected DriverBaseITCase(String idType, TestExecutionMode mode) {
 		super(mode);
+
+		this.idType = idType;
 	}
 
-	// extend MultipleProgramsTestBase default to include object reuse mode
-	@Parameterized.Parameters(name = "Execution mode = {0}")
+	@Parameterized.Parameters(name = "ID type = {0}, Execution mode = {1}")
 	public static Collection<Object[]> executionModes() {
-		return Arrays.asList(
-			new Object[] { TestExecutionMode.CLUSTER },
-			new Object[] { TestExecutionMode.CLUSTER_OBJECT_REUSE },
-			new Object[] { TestExecutionMode.COLLECTION });
+		List<Object[]> executionModes = new ArrayList<>();
+
+		for (String idType : new String[] {"byte", "nativeByte", "short", "nativeShort", "char", "nativeChar",
+								"integer", "nativeInteger", "long", "nativeLong", "string", "nativeString"}) {
+			for (TestExecutionMode executionMode : TestExecutionMode.values()) {
+				executionModes.add(new Object[] {idType, executionMode});
+			}
+		}
+
+		return executionModes;
+	}
+
+	/**
+	 * Simpler variant of {@link #expectedOutput(String[], String)}
+	 * that converts the {@link Checksum} to a string and ignores
+	 * leading and trailing newlines.
+	 *
+	 * @param parameters algorithm, input, and output arguments
+	 * @param expectedCount expected number of records
+	 * @param expectedChecksum expected checksum over records
+	 * @throws Exception on error
+	 */
+	protected void expectedChecksum(String[] parameters, long expectedCount, long expectedChecksum) throws Exception {
+		Checksum checksum = new Checksum(expectedCount, expectedChecksum);
+		expectedOutput(parameters, "\n*" + checksum.toString() + "\n*");
 	}
 
 	/**
@@ -87,6 +116,33 @@ extends MultipleProgramsTestBase {
 	}
 
 	/**
+	 * Simpler variant of {@link #expectedOutput(String[], String)}
+	 * that sums the hashCode() of each line of output.
+	 *
+	 * @param parameters algorithm, input, and output arguments
+	 * @param expected expected checksum over lines of output
+	 * @throws Exception on error
+	 */
+	protected void expectedOutputChecksum(String[] parameters, Checksum expected) throws Exception {
+		String output = getSystemOutput(parameters);
+
+		long count = 0;
+		long checksum = 0;
+
+		for (String line : output.split(System.getProperty("line.separator"))) {
+			if (line.length() > 0) {
+				count++;
+
+				// convert 32-bit integer to non-negative long
+				checksum += line.hashCode() & 0xffffffffL;
+			}
+		}
+
+		Assert.assertEquals(expected.getCount(), count);
+		Assert.assertEquals(expected.getChecksum(), checksum);
+	}
+
+	/**
 	 * Executes the driver with the provided arguments and compares the
 	 * exception and exception method with the given class and regular
 	 * expression.
@@ -96,7 +152,7 @@ extends MultipleProgramsTestBase {
 	 * @param exception expected exception
 	 * @throws Exception on error when not matching exception
 	 */
-	protected void expectedOutputFromException(String[] parameters, String expected,Class<? extends Throwable> exception) throws Exception {
+	protected void expectedOutputFromException(String[] parameters, String expected, Class<? extends Throwable> exception) throws Exception {
 		expectedException.expect(exception);
 		expectedException.expectMessage(RegexMatcher.matchesRegex(expected));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
index d9cac8b..f566218 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -27,8 +30,17 @@ import org.junit.runners.Parameterized;
 public class EdgeListITCase
 extends DriverBaseITCase {
 
-	public EdgeListITCase(TestExecutionMode mode) {
-		super(mode);
+	public EdgeListITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(String input, String output, String... additionalParameters) {
+		String[] parameters = new String[] {
+			"--algorithm", "EdgeList",
+			"--input", input, "--type", idType,
+			"--output", output};
+
+		return ArrayUtils.addAll(parameters, additionalParameters);
 	}
 
 	@Test
@@ -43,198 +55,448 @@ extends DriverBaseITCase {
 
 	@Test
 	public void testHashWithCompleteGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x0000000006788c22, count 1722\n";
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x000000000217bbe2L;
+				break;
+
+			case "long":
+				checksum = 0x0000000006788c22L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x000000007ddfd962L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("CompleteGraph", "hash", "--vertex_count", "42"),
+			1722, checksum);
+	}
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "CompleteGraph", "--vertex_count", "42",
-				"--output", "hash"},
-			expected);
+	@Test
+	public void testPrintWithCompleteGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedOutputChecksum(
+			parameters("CompleteGraph", "print", "--vertex_count", "42"),
+			new Checksum(1722, 0x0000031109a0c398L));
 	}
 
 	@Test
 	public void testHashWithCycleGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x000000000050cea4, count 84\n";
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x00000000001a2224L;
+				break;
+
+			case "long":
+				checksum = 0x000000000050cea4L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x000000000623e524L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("CycleGraph", "hash", "--vertex_count", "42"),
+			84, checksum);
+	}
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "CycleGraph", "--vertex_count", "42",
-				"--output", "hash"},
-			expected);
+	@Test
+	public void testPrintWithCycleGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedOutputChecksum(
+			parameters("CycleGraph", "print", "--vertex_count", "42"),
+			new Checksum(84, 0x000000272a136fcaL));
 	}
 
 	@Test
 	public void testHashWithEmptyGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x0000000000000000, count 0\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "EmptyGraph", "--vertex_count", "42",
-				"--output", "hash"},
-			expected);
+		expectedChecksum(
+			parameters("EmptyGraph", "hash", "--vertex_count", "42"),
+			0, 0x0000000000000000);
 	}
 
 	@Test
 	public void testHashWithGridGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x00000000357d33a6, count 2990\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "GridGraph", "--dim0", "5:true", "--dim1", "8:false", "--dim2", "13:true",
-				"--output", "hash"},
-			expected);
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x00000000001ca34aL;
+				break;
+
+			case "long":
+				checksum = 0x000000000071408aL;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x00000000081ee80aL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("GridGraph", "hash", "--dim0", "2:true", "--dim1", "3:false", "--dim2", "5:true"),
+			130, checksum);
 	}
 
 	@Test
-	public void testHashWithHypercubeGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x0000000014a72800, count 2048\n";
+	public void testPrintWithGridGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "HypercubeGraph", "--dimensions", "8",
-				"--output", "hash"},
-			expected);
+		expectedOutputChecksum(
+			parameters("GridGraph", "print", "--dim0", "2:true", "--dim1", "3:false", "--dim2", "5:true"),
+			new Checksum(130, 0x00000033237d24eeL));
 	}
 
 	@Test
-	public void testHashWithPathGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x00000000004ee21a, count 82\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "PathGraph", "--vertex_count", "42",
-				"--output", "hash"},
-			expected);
+	public void testHashWithHypercubeGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x00000000035df180L;
+				break;
+
+			case "long":
+				checksum = 0x0000000005a52180L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x0000000273474480L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("HypercubeGraph", "hash", "--dimensions", "7"),
+			896, checksum);
 	}
 
 	@Test
-	public void testHashWithRMatIntegerGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x00000000ed469103, count 16384\n";
+	public void testPrintWithHypercubeGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "integer",
-				"--output", "hash"},
-			expected);
+		expectedOutputChecksum(
+			parameters("HypercubeGraph", "print", "--dimensions", "7"),
+			new Checksum(896, 0x000001f243ee33b2L));
 	}
 
 	@Test
-	public void testHashWithRMatIntegerDirectedGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x00000000c53bfc9b, count 12009\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
-				"--output", "hash"},
-			expected);
+	public void testHashWithPathGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x00000000001982daL;
+				break;
+
+			case "long":
+				checksum = 0x00000000004ee21aL;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x00000000060a065aL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("PathGraph", "hash", "--vertex_count", "42"),
+			82, checksum);
 	}
 
 	@Test
-	public void testHashWithRMatIntegerUndirectedGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x00000001664eb9e4, count 20884\n";
+	public void testPrintWithPathGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output", "hash"},
-			expected);
+		expectedOutputChecksum(
+			parameters("PathGraph", "print", "--vertex_count", "42"),
+			new Checksum(82, 0x000000269be2d4c2L));
 	}
 
 	@Test
-	public void testHashWithRMatLongGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x0000000116ee9103, count 16384\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "long",
-				"--output", "hash"},
-			expected);
+	public void testHashWithRMatGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x0000000003bf67f7L;
+				break;
+
+			case "long":
+				checksum = 0x0000000008f467f7L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x00000001660861bdL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("RMatGraph", "hash", "--scale", "7"),
+			2048, checksum);
 	}
 
 	@Test
-	public void testPrintWithRMatLongGraph() throws Exception {
+	public void testPrintWithRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
 
-		expectedCount(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "long",
-				"--output", "print"},
-			16384);
+		expectedOutputChecksum(
+			parameters("RMatGraph", "print", "--scale", "7"),
+			new Checksum(2048, 0x000002f737939f05L));
 	}
 
 	@Test
-	public void testHashWithRMatLongDirectedGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x00000000e3c4643b, count 12009\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "long", "--simplify", "directed",
-				"--output", "hash"},
-			expected);
+	public void testHashWithDirectedRMatGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x00000000029aafb3L;
+				break;
+
+			case "long":
+				checksum = 0x000000000592e9b3L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x000000011b079691L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("RMatGraph", "hash", "--scale", "7", "--simplify", "directed"),
+			1168, checksum);
 	}
 
 	@Test
-	public void testHashWithRMatLongUndirectedGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x000000019b67ae64, count 20884\n";
+	public void testPrintWithDirectedRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "long", "--simplify", "undirected",
-				"--output", "hash"},
-			expected);
+		expectedOutputChecksum(
+			parameters("RMatGraph", "print", "--scale", "7", "--simplify", "directed"),
+			new Checksum(1168, 0x0000020e35b0f35dL));
 	}
 
 	@Test
-	public void testHashWithRMatStringGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x00000071dc80a623, count 16384\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "string",
-				"--output", "hash"},
-			expected);
+	public void testHashWithUndirectedRMatGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x0000000004627ab6L;
+				break;
+
+			case "long":
+				checksum = 0x0000000009193576L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x00000001e9adcf56L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("RMatGraph", "hash", "--scale", "7", "--simplify", "undirected"),
+			1854, checksum);
 	}
 
 	@Test
-	public void testHashWithRMatStringDirectedGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x0000005d58b3fa7d, count 12009\n";
+	public void testPrintWithUndirectedRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "string", "--simplify", "directed",
-				"--output", "hash"},
-			expected);
+		expectedOutputChecksum(
+			parameters("RMatGraph", "print", "--scale", "7", "--simplify", "undirected"),
+			new Checksum(1854, 0x0000036fe5802162L));
 	}
 
 	@Test
-	public void testHashWithRMatStringUndirectedGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x000000aa54987304, count 20884\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "RMatGraph", "--type", "string", "--simplify", "undirected",
-				"--output", "hash"},
-			expected);
+	public void testHashWithSingletonEdgeGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x000000000034d5a4L;
+				break;
+
+			case "long":
+				checksum = 0x00000000006b8224L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x000000000757c6a4L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("SingletonEdgeGraph", "hash", "--vertex_pair_count", "42"),
+			84, checksum);
 	}
 
 	@Test
-	public void testHashWithSingletonEdgeGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x0000000001af8ee8, count 200\n";
+	public void testPrintWithSingletonEdgeGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "SingletonEdgeGraph", "--vertex_pair_count", "100",
-				"--output", "hash"},
-			expected);
+		expectedOutputChecksum(
+			parameters("SingletonEdgeGraph", "print", "--vertex_pair_count", "42"),
+			new Checksum(84, 0x0000002e59e10d9aL));
 	}
 
 	@Test
 	public void testHashWithStarGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x000000000042789a, count 82\n";
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x00000000000d195aL;
+				break;
+
+			case "long":
+				checksum = 0x000000000042789aL;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x00000000032f0adaL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("StarGraph", "hash", "--vertex_count", "42"),
+			82, checksum);
+	}
+
+	@Test
+	public void testPrintWithStarGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
 
-		expectedOutput(
-			new String[]{"--algorithm", "EdgeList",
-				"--input", "StarGraph", "--vertex_count", "42",
-				"--output", "hash"},
-			expected);
+		expectedOutputChecksum(
+			parameters("StarGraph", "print", "--vertex_count", "42"),
+			new Checksum(82, 0x00000011ec3faee8L));
 	}
 }