You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/09/23 18:48:16 UTC

[1/2] flink git commit: [FLINK-4664] [gelly] Add translator to NullValue

Repository: flink
Updated Branches:
  refs/heads/master 5d0358af4 -> 40c978b04


[FLINK-4664] [gelly] Add translator to NullValue

This translator is appropriate for translating vertex and edge values to
NullValue when the values are not used by an algorithm. Also, added
tests and moved translators into a subpackage.

This closes #2536


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd90abe6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd90abe6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd90abe6

Branch: refs/heads/master
Commit: dd90abe669326fe3be3b9c37a158192211a4606f
Parents: 5d0358a
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 22 16:47:17 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Sep 23 13:46:24 2016 -0400

----------------------------------------------------------------------
 .../apache/flink/graph/driver/GraphMetrics.java |  2 +-
 .../graph/examples/ClusteringCoefficient.java   |  2 +-
 .../org/apache/flink/graph/examples/HITS.java   |  2 +-
 .../flink/graph/examples/JaccardIndex.java      |  2 +-
 .../flink/graph/examples/TriangleListing.java   |  2 +-
 .../graph/asm/translate/LongValueAddOffset.java | 50 ------------------
 .../translate/LongValueToSignedIntValue.java    | 44 ----------------
 .../asm/translate/LongValueToStringValue.java   | 40 ---------------
 .../translate/LongValueToUnsignedIntValue.java  | 49 ------------------
 .../translators/LongValueAddOffset.java         | 53 ++++++++++++++++++++
 .../translators/LongValueToSignedIntValue.java  | 45 +++++++++++++++++
 .../translators/LongValueToStringValue.java     | 41 +++++++++++++++
 .../LongValueToUnsignedIntValue.java            | 50 ++++++++++++++++++
 .../asm/translate/translators/ToNullValue.java  | 37 ++++++++++++++
 .../LongValueToSignedIntValueTest.java          | 49 ------------------
 .../LongValueToUnsignedIntValueTest.java        | 49 ------------------
 .../graph/asm/translate/TranslateTest.java      |  1 +
 .../translators/LongValueAddOffsetTest.java     | 47 +++++++++++++++++
 .../LongValueToSignedIntValueTest.java          | 50 ++++++++++++++++++
 .../translators/LongValueToStringValueTest.java | 40 +++++++++++++++
 .../LongValueToUnsignedIntValueTest.java        | 50 ++++++++++++++++++
 .../translate/translators/ToNullValueTest.java  | 43 ++++++++++++++++
 22 files changed, 462 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
index 44b64ee..79c5f80 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index 9961fff..7835531 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
index b2ce726..f70d5dc 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.directed.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index fa69ef0..4cfbc71 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index d819d19..43c5eba 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java
deleted file mode 100644
index 4b55307..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.translate;
-
-import org.apache.flink.types.LongValue;
-
-/**
- * Translate {@link LongValue} by adding a constant offset value.
- */
-public class LongValueAddOffset
-implements TranslateFunction<LongValue, LongValue> {
-
-	private final long offset;
-
-	/**
-	 * Translate {@link LongValue} by adding a constant offset value.
-	 *
-	 * @param offset value to be added to each element
-	 */
-	public LongValueAddOffset(long offset) {
-		this.offset = offset;
-	}
-
-	@Override
-	public LongValue translate(LongValue value, LongValue reuse)
-			throws Exception {
-		if (reuse == null) {
-			reuse = new LongValue();
-		}
-
-		reuse.setValue(offset + value.getValue());
-		return reuse;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
deleted file mode 100644
index 92a4abc..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.translate;
-
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.util.MathUtils;
-
-/**
- * Translate {@link LongValue} to {@link IntValue}.
- *
- * Throws {@link RuntimeException} for integer overflow.
- */
-public class LongValueToSignedIntValue
-implements TranslateFunction<LongValue, IntValue> {
-
-	@Override
-	public IntValue translate(LongValue value, IntValue reuse)
-			throws Exception {
-		if (reuse == null) {
-			reuse = new IntValue();
-		}
-
-		reuse.setValue(MathUtils.checkedDownCast(value.getValue()));
-
-		return reuse;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java
deleted file mode 100644
index bdcf37c..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.translate;
-
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * Translate {@link LongValue} to {@link StringValue}.
- */
-public class LongValueToStringValue
-implements TranslateFunction<LongValue, StringValue> {
-
-	@Override
-	public StringValue translate(LongValue value, StringValue reuse)
-			throws Exception {
-		if (reuse == null) {
-			reuse = new StringValue();
-		}
-
-		reuse.setValue(Long.toString(value.getValue()));
-		return reuse;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
deleted file mode 100644
index 943e727..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.translate;
-
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-
-/**
- * Translate {@link LongValue} to {@link IntValue}.
- *
- * Throws {@link RuntimeException} for integer overflow.
- */
-public class LongValueToUnsignedIntValue
-implements TranslateFunction<LongValue, IntValue> {
-
-	@Override
-	public IntValue translate(LongValue value, IntValue reuse)
-			throws Exception {
-		if (reuse == null) {
-			reuse = new IntValue();
-		}
-
-		long l = value.getValue();
-
-		if (l < 0 || l >= (1L << 32)) {
-			throw new IllegalArgumentException("Cannot cast long value " + value + " to integer.");
-		} else {
-			reuse.setValue((int)(l & 0xffffffffL));
-		}
-
-		return reuse;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueAddOffset.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueAddOffset.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueAddOffset.java
new file mode 100644
index 0000000..d44ece4
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueAddOffset.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate {@link LongValue} by adding a constant offset value.
+ */
+public class LongValueAddOffset
+implements TranslateFunction<LongValue, LongValue> {
+
+	private final long offset;
+
+	/**
+	 * Translate {@link LongValue} by adding a constant offset value.
+	 *
+	 * The summation is *not* checked for overflow or underflow.
+	 *
+	 * @param offset value to be added to each element
+	 */
+	public LongValueAddOffset(long offset) {
+		this.offset = offset;
+	}
+
+	@Override
+	public LongValue translate(LongValue value, LongValue reuse)
+			throws Exception {
+		if (reuse == null) {
+			reuse = new LongValue();
+		}
+
+		reuse.setValue(offset + value.getValue());
+		return reuse;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
new file mode 100644
index 0000000..038a2e4
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.MathUtils;
+
+/**
+ * Translate {@link LongValue} to {@link IntValue}.
+ *
+ * Throws {@link RuntimeException} for integer overflow.
+ */
+public class LongValueToSignedIntValue
+implements TranslateFunction<LongValue, IntValue> {
+
+	@Override
+	public IntValue translate(LongValue value, IntValue reuse)
+			throws Exception {
+		if (reuse == null) {
+			reuse = new IntValue();
+		}
+
+		reuse.setValue(MathUtils.checkedDownCast(value.getValue()));
+
+		return reuse;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToStringValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToStringValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToStringValue.java
new file mode 100644
index 0000000..0c2e87e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToStringValue.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+/**
+ * Translate {@link LongValue} to {@link StringValue}.
+ */
+public class LongValueToStringValue
+implements TranslateFunction<LongValue, StringValue> {
+
+	@Override
+	public StringValue translate(LongValue value, StringValue reuse)
+			throws Exception {
+		if (reuse == null) {
+			reuse = new StringValue();
+		}
+
+		reuse.setValue(Long.toString(value.getValue()));
+		return reuse;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
new file mode 100644
index 0000000..8fe665c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate {@link LongValue} to {@link IntValue}.
+ *
+ * Throws {@link RuntimeException} for integer overflow.
+ */
+public class LongValueToUnsignedIntValue
+implements TranslateFunction<LongValue, IntValue> {
+
+	@Override
+	public IntValue translate(LongValue value, IntValue reuse)
+			throws Exception {
+		if (reuse == null) {
+			reuse = new IntValue();
+		}
+
+		long l = value.getValue();
+
+		if (l < 0 || l >= (1L << 32)) {
+			throw new IllegalArgumentException("Cannot cast long value " + value + " to integer.");
+		} else {
+			reuse.setValue((int)(l & 0xffffffffL));
+		}
+
+		return reuse;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/ToNullValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/ToNullValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/ToNullValue.java
new file mode 100644
index 0000000..9b44b27
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/ToNullValue.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Replace any type with {@link NullValue}.
+ *
+ * @param <T> the type of the source value
+ */
+public class ToNullValue<T>
+implements TranslateFunction<T, NullValue> {
+
+	@Override
+	public NullValue translate(T value, NullValue reuse)
+			throws Exception {
+		return NullValue.getInstance();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
deleted file mode 100644
index 9e2ec0a..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.translate;
-
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class LongValueToSignedIntValueTest {
-
-	private TranslateFunction<LongValue, IntValue> translator = new LongValueToSignedIntValue();
-
-	private IntValue reuse = new IntValue();
-
-	@Test
-	public void testTranslation() throws Exception {
-		assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long)Integer.MIN_VALUE), reuse));
-		assertEquals(new IntValue(0), translator.translate(new LongValue(0L), reuse));
-		assertEquals(new IntValue(Integer.MAX_VALUE), translator.translate(new LongValue((long)Integer.MAX_VALUE), reuse));
-	}
-
-	@Test(expected=IllegalArgumentException.class)
-	public void testUpperOutOfRange() throws Exception {
-		assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
-	}
-
-	@Test(expected=IllegalArgumentException.class)
-	public void testLowerOutOfRange() throws Exception {
-		assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MIN_VALUE - 1), reuse));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
deleted file mode 100644
index 14e9685..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.translate;
-
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class LongValueToUnsignedIntValueTest {
-
-	private TranslateFunction<LongValue, IntValue> translator = new LongValueToUnsignedIntValue();
-
-	private IntValue reuse = new IntValue();
-
-	@Test
-	public void testTranslation() throws Exception {
-		assertEquals(new IntValue(0), translator.translate(new LongValue(0L), reuse));
-		assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
-		assertEquals(new IntValue(-1), translator.translate(new LongValue((1L << 32) - 1), reuse));
-	}
-
-	@Test(expected=IllegalArgumentException.class)
-	public void testUpperOutOfRange() throws Exception {
-		assertEquals(new IntValue(), translator.translate(new LongValue(1L << 32), reuse));
-	}
-
-	@Test(expected=IllegalArgumentException.class)
-	public void testLowerOutOfRange() throws Exception {
-		assertEquals(new IntValue(), translator.translate(new LongValue(-1), reuse));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
index e13aa6a..0cf026c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueAddOffsetTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueAddOffsetTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueAddOffsetTest.java
new file mode 100644
index 0000000..ad63209
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueAddOffsetTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongValueAddOffsetTest {
+
+	@Test
+	public void testTranslation() throws Exception {
+		LongValue reuse = new LongValue();
+
+		assertEquals(new LongValue(0), new LongValueAddOffset(0).translate(new LongValue(0), reuse));
+		assertEquals(new LongValue(3), new LongValueAddOffset(1).translate(new LongValue(2), reuse));
+		assertEquals(new LongValue(1), new LongValueAddOffset(-1).translate(new LongValue(2), reuse));
+
+		assertEquals(new LongValue(-1), new LongValueAddOffset(Long.MIN_VALUE).translate(new LongValue(Long.MAX_VALUE), reuse));
+		assertEquals(new LongValue(-1), new LongValueAddOffset(Long.MAX_VALUE).translate(new LongValue(Long.MIN_VALUE), reuse));
+
+		// underflow wraps to positive values
+		assertEquals(new LongValue(Long.MAX_VALUE), new LongValueAddOffset(-1).translate(new LongValue(Long.MIN_VALUE), reuse));
+		assertEquals(new LongValue(0), new LongValueAddOffset(Long.MIN_VALUE).translate(new LongValue(Long.MIN_VALUE), reuse));
+
+		// overflow wraps to negative values
+		assertEquals(new LongValue(Long.MIN_VALUE), new LongValueAddOffset(1).translate(new LongValue(Long.MAX_VALUE), reuse));
+		assertEquals(new LongValue(-2), new LongValueAddOffset(Long.MAX_VALUE).translate(new LongValue(Long.MAX_VALUE), reuse));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
new file mode 100644
index 0000000..976cc34
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongValueToSignedIntValueTest {
+
+	private TranslateFunction<LongValue, IntValue> translator = new LongValueToSignedIntValue();
+
+	private IntValue reuse = new IntValue();
+
+	@Test
+	public void testTranslation() throws Exception {
+		assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long)Integer.MIN_VALUE), reuse));
+		assertEquals(new IntValue(0), translator.translate(new LongValue(0L), reuse));
+		assertEquals(new IntValue(Integer.MAX_VALUE), translator.translate(new LongValue((long)Integer.MAX_VALUE), reuse));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testUpperOutOfRange() throws Exception {
+		assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testLowerOutOfRange() throws Exception {
+		assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MIN_VALUE - 1), reuse));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToStringValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToStringValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToStringValueTest.java
new file mode 100644
index 0000000..8980cd3
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToStringValueTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongValueToStringValueTest {
+
+	private TranslateFunction<LongValue, StringValue> translator = new LongValueToStringValue();
+
+	@Test
+	public void testTranslation() throws Exception {
+		StringValue reuse = new StringValue();
+
+		assertEquals(new StringValue("-9223372036854775808"), translator.translate(new LongValue(Long.MIN_VALUE), reuse));
+		assertEquals(new StringValue("0"), translator.translate(new LongValue(0), reuse));
+		assertEquals(new StringValue("9223372036854775807"), translator.translate(new LongValue(Long.MAX_VALUE), reuse));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
new file mode 100644
index 0000000..6a2ae83
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongValueToUnsignedIntValueTest {
+
+	private TranslateFunction<LongValue, IntValue> translator = new LongValueToUnsignedIntValue();
+
+	private IntValue reuse = new IntValue();
+
+	@Test
+	public void testTranslation() throws Exception {
+		assertEquals(new IntValue(0), translator.translate(new LongValue(0L), reuse));
+		assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+		assertEquals(new IntValue(-1), translator.translate(new LongValue((1L << 32) - 1), reuse));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testUpperOutOfRange() throws Exception {
+		assertEquals(new IntValue(), translator.translate(new LongValue(1L << 32), reuse));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testLowerOutOfRange() throws Exception {
+		assertEquals(new IntValue(), translator.translate(new LongValue(-1), reuse));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd90abe6/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/ToNullValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/ToNullValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/ToNullValueTest.java
new file mode 100644
index 0000000..6d8a8da
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/ToNullValueTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate.translators;
+
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ToNullValueTest {
+
+	@Test
+	public void testTranslation() throws Exception {
+		NullValue reuse = NullValue.getInstance();
+
+		assertEquals(NullValue.getInstance(), new ToNullValue<>().translate(new DoubleValue(), reuse));
+		assertEquals(NullValue.getInstance(), new ToNullValue<>().translate(new FloatValue(), reuse));
+		assertEquals(NullValue.getInstance(), new ToNullValue<>().translate(new IntValue(), reuse));
+		assertEquals(NullValue.getInstance(), new ToNullValue<>().translate(new LongValue(), reuse));
+		assertEquals(NullValue.getInstance(), new ToNullValue<>().translate(new StringValue(), reuse));
+	}
+}


[2/2] flink git commit: [FLINK-4668] [clients] Fix positive random int generation

Posted by gr...@apache.org.
[FLINK-4668] [clients] Fix positive random int generation

This closes #2539


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40c978b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40c978b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40c978b0

Branch: refs/heads/master
Commit: 40c978b0443d9cb3ee66b6e0e63ab61416deac25
Parents: dd90abe
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Thu Sep 22 23:26:27 2016 -0700
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Sep 23 13:48:15 2016 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/flink/client/program/PackagedProgram.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40c978b0/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index bff8e3e..2a88043 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -679,7 +679,7 @@ public class PackagedProgram {
 					
 						File tempFile;
 						try {
-							tempFile = File.createTempFile(String.valueOf(Math.abs(rnd.nextInt()) + "_"), name);
+							tempFile = File.createTempFile(rnd.nextInt(Integer.MAX_VALUE) + "_", name);
 							tempFile.deleteOnExit();
 						}
 						catch (IOException e) {