You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/09/20 14:12:39 UTC
[1/4] flink git commit: [FLINK-4572] [gelly] Convert to negative in
LongValueToIntValue
Repository: flink
Updated Branches:
refs/heads/master 0975d9f11 -> e58fd6e00
[FLINK-4572] [gelly] Convert to negative in LongValueToIntValue
The Gelly drivers expect that scale 32 edges, represented by the lower
32 bits of long values, can be converted to int values. Values between
2^31 and 2^32 - 1 should be converted to negative integers.
Creates separate signed and unsigned translators for long to int. This
prevents ambiguous conversion since each translator only works on a
32-bit range of values.
This closes #2469
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e58fd6e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e58fd6e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e58fd6e0
Branch: refs/heads/master
Commit: e58fd6e001b319aa7325a0544735f8c0680141d8
Parents: 4f84a02
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Sep 2 12:01:29 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Sep 20 10:10:20 2016 -0400
----------------------------------------------------------------------
.../apache/flink/graph/driver/GraphMetrics.java | 6 +--
.../graph/examples/ClusteringCoefficient.java | 6 +--
.../org/apache/flink/graph/examples/HITS.java | 4 +-
.../flink/graph/examples/JaccardIndex.java | 4 +-
.../flink/graph/examples/TriangleListing.java | 6 +--
.../asm/translate/LongValueToIntValue.java | 43 -----------------
.../translate/LongValueToSignedIntValue.java | 44 ++++++++++++++++++
.../translate/LongValueToUnsignedIntValue.java | 49 ++++++++++++++++++++
.../LongValueToSignedIntValueTest.java | 49 ++++++++++++++++++++
.../LongValueToUnsignedIntValueTest.java | 49 ++++++++++++++++++++
10 files changed, 204 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
index cc265bb..44b64ee 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -180,7 +180,7 @@ public class GraphMetrics {
.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
vm = newGraph
@@ -201,7 +201,7 @@ public class GraphMetrics {
.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
vm = newGraph
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index f4b1ecf..9961fff 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -195,7 +195,7 @@ public class ClusteringCoefficient {
.setLittleParallelism(little_parallelism));
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
.setParallelism(little_parallelism))
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
.setParallelism(little_parallelism));
@@ -225,7 +225,7 @@ public class ClusteringCoefficient {
.setLittleParallelism(little_parallelism));
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
.setParallelism(little_parallelism))
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
.setParallelism(little_parallelism));
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
index 59612d9..b2ce726 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphCsvReader;
import org.apache.flink.graph.asm.simple.directed.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -137,7 +137,7 @@ public class HITS {
.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
} else {
hits = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
.run(new Simplify<IntValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 96f66ab..fa69ef0 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphCsvReader;
import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -151,7 +151,7 @@ public class JaccardIndex {
.setLittleParallelism(little_parallelism));
} else {
ji = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
.setParallelism(little_parallelism))
.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
.setParallelism(little_parallelism))
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index f3ce708..d819d19 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphCsvReader;
import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -159,7 +159,7 @@ public class TriangleListing {
.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
} else {
tl = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>());
}
@@ -175,7 +175,7 @@ public class TriangleListing {
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
} else {
tl = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
deleted file mode 100644
index adaf592..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.translate;
-
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.util.MathUtils;
-
-/**
- * Translate {@link LongValue} to {@link IntValue}.
- *
- * Throws {@link RuntimeException} for integer overflow.
- */
-public class LongValueToIntValue
-implements TranslateFunction<LongValue, IntValue> {
-
- @Override
- public IntValue translate(LongValue value, IntValue reuse)
- throws Exception {
- if (reuse == null) {
- reuse = new IntValue();
- }
-
- reuse.setValue(MathUtils.checkedDownCast(value.getValue()));
- return reuse;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
new file mode 100644
index 0000000..92a4abc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.MathUtils;
+
+/**
+ * Translate {@link LongValue} to {@link IntValue}.
+ *
+ * Throws {@link RuntimeException} for integer overflow.
+ */
+public class LongValueToSignedIntValue
+implements TranslateFunction<LongValue, IntValue> {
+
+ @Override
+ public IntValue translate(LongValue value, IntValue reuse)
+ throws Exception {
+ if (reuse == null) {
+ reuse = new IntValue();
+ }
+
+ reuse.setValue(MathUtils.checkedDownCast(value.getValue()));
+
+ return reuse;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
new file mode 100644
index 0000000..943e727
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate {@link LongValue} to {@link IntValue}.
+ *
+ * Throws {@link RuntimeException} for integer overflow.
+ */
+public class LongValueToUnsignedIntValue
+implements TranslateFunction<LongValue, IntValue> {
+
+ @Override
+ public IntValue translate(LongValue value, IntValue reuse)
+ throws Exception {
+ if (reuse == null) {
+ reuse = new IntValue();
+ }
+
+ long l = value.getValue();
+
+ if (l < 0 || l >= (1L << 32)) {
+ throw new IllegalArgumentException("Cannot cast long value " + value + " to integer.");
+ } else {
+ reuse.setValue((int)(l & 0xffffffffL));
+ }
+
+ return reuse;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
new file mode 100644
index 0000000..9e2ec0a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongValueToSignedIntValueTest {
+
+ private TranslateFunction<LongValue, IntValue> translator = new LongValueToSignedIntValue();
+
+ private IntValue reuse = new IntValue();
+
+ @Test
+ public void testTranslation() throws Exception {
+ assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long)Integer.MIN_VALUE), reuse));
+ assertEquals(new IntValue(0), translator.translate(new LongValue(0L), reuse));
+ assertEquals(new IntValue(Integer.MAX_VALUE), translator.translate(new LongValue((long)Integer.MAX_VALUE), reuse));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testUpperOutOfRange() throws Exception {
+ assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testLowerOutOfRange() throws Exception {
+ assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MIN_VALUE - 1), reuse));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
new file mode 100644
index 0000000..14e9685
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongValueToUnsignedIntValueTest {
+
+ private TranslateFunction<LongValue, IntValue> translator = new LongValueToUnsignedIntValue();
+
+ private IntValue reuse = new IntValue();
+
+ @Test
+ public void testTranslation() throws Exception {
+ assertEquals(new IntValue(0), translator.translate(new LongValue(0L), reuse));
+ assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+ assertEquals(new IntValue(-1), translator.translate(new LongValue((1L << 32) - 1), reuse));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testUpperOutOfRange() throws Exception {
+ assertEquals(new IntValue(), translator.translate(new LongValue(1L << 32), reuse));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testLowerOutOfRange() throws Exception {
+ assertEquals(new IntValue(), translator.translate(new LongValue(-1), reuse));
+ }
+}
[2/4] flink git commit: [FLINK-4609] [java-api] Remove redundant
check for null in CrossOperator
Posted by gr...@apache.org.
[FLINK-4609] [java-api] Remove redundant check for null in CrossOperator
This closes #2490
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/470b752e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/470b752e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/470b752e
Branch: refs/heads/master
Commit: 470b752e693ae4292d34fc7fc0c778f95fa04fd9
Parents: 7a25bf5
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Fri Sep 9 22:24:43 2016 -0700
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Sep 20 10:10:20 2016 -0400
----------------------------------------------------------------------
.../flink/api/java/operators/CrossOperator.java | 23 ++++++--------------
.../api/java/operators/TwoInputOperator.java | 5 +++--
2 files changed, 10 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/470b752e/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 36e6c1c..3fdc51d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -20,9 +20,9 @@ package org.apache.flink.api.java.operators;
import java.util.Arrays;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -124,21 +124,12 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
@Public
public static final class DefaultCross<I1, I2> extends CrossOperator<I1, I2, Tuple2<I1, I2>> {
- private final DataSet<I1> input1;
- private final DataSet<I2> input2;
-
public DefaultCross(DataSet<I1> input1, DataSet<I2> input2, CrossHint hint, String defaultName) {
-
super(input1, input2, new DefaultCrossFunction<I1, I2>(),
- new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()),
- hint, defaultName);
-
- if (input1 == null || input2 == null) {
- throw new NullPointerException();
- }
-
- this.input1 = input1;
- this.input2 = input2;
+ new TupleTypeInfo<Tuple2<I1, I2>>(
+ Preconditions.checkNotNull(input1, "input1 is null").getType(),
+ Preconditions.checkNotNull(input2, "input2 is null").getType()),
+ hint, defaultName);
}
/**
@@ -155,9 +146,9 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
if (function == null) {
throw new NullPointerException("Cross function must not be null.");
}
- TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType(),
+ TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, getInput1().getType(), getInput2().getType(),
super.getDefaultName(), true);
- return new CrossOperator<I1, I2, R>(input1, input2, clean(function), returnType,
+ return new CrossOperator<I1, I2, R>(getInput1(), getInput2(), clean(function), returnType,
getCrossHint(), Utils.getCallLocationName());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/470b752e/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
index c64882d..28dec32 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.util.Preconditions;
/**
* Base class for operations that operates on two input data sets.
@@ -37,8 +38,8 @@ public abstract class TwoInputOperator<IN1, IN2, OUT, O extends TwoInputOperator
protected TwoInputOperator(DataSet<IN1> input1, DataSet<IN2> input2, TypeInformation<OUT> resultType) {
- super(input1.getExecutionEnvironment(), resultType);
-
+ super(Preconditions.checkNotNull(input1, "input1 is null").getExecutionEnvironment(), resultType);
+ Preconditions.checkNotNull(input2, "input2 is null");
DataSet.checkSameExecutionContext(input1, input2);
this.input1 = input1;
this.input2 = input2;
[4/4] flink git commit: [FLINK-4638] [core] Fix exception message for
MemorySegment
Posted by gr...@apache.org.
[FLINK-4638] [core] Fix exception message for MemorySegment
This closes #2515
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a25bf5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a25bf5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a25bf5c
Branch: refs/heads/master
Commit: 7a25bf5cee9ab94525ed0284cbf399d2c33f70cf
Parents: 0975d9f
Author: Liwei Lin <lw...@gmail.com>
Authored: Tue Sep 20 14:35:28 2016 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Sep 20 10:10:20 2016 -0400
----------------------------------------------------------------------
.../src/main/java/org/apache/flink/core/memory/MemorySegment.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a25bf5c/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index af3efe7..d8315c5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -161,7 +161,7 @@ public abstract class MemorySegment {
}
if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
// this is necessary to make sure the collapsed checks are safe against numeric overflows
- throw new IllegalArgumentException("Segment initialized with too large address: " + address
+ throw new IllegalArgumentException("Segment initialized with too large address: " + offHeapAddress
+ " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));
}
[3/4] flink git commit: [FLINK-4594] [core] Validate lower bound in
MathUtils.checkedDownCast
Posted by gr...@apache.org.
[FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast
This closes #2481
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f84a02f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f84a02f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f84a02f
Branch: refs/heads/master
Commit: 4f84a02f4b8be5948b233531fbcb07dfdb5e7930
Parents: 470b752
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 8 10:35:39 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Sep 20 10:10:20 2016 -0400
----------------------------------------------------------------------
flink-core/src/main/java/org/apache/flink/util/MathUtils.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4f84a02f/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 49056cc..074e8ae 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -80,12 +80,14 @@ public final class MathUtils {
*
* @param value The value to be cast to an integer.
* @return The given value as an integer.
+ * @see Math#toIntExact(long)
*/
public static int checkedDownCast(long value) {
- if (value > Integer.MAX_VALUE) {
+ int downCast = (int) value;
+ if (downCast != value) {
throw new IllegalArgumentException("Cannot downcast long value " + value + " to integer.");
}
- return (int) value;
+ return downCast;
}
/**