You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/10 12:08:13 UTC

[2/2] flink git commit: [FLINK-4002] [py] Improve testing infrastructure

[FLINK-4002] [py] Improve testing infrastructure

This closes #2063


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76dcbd45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76dcbd45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76dcbd45

Branch: refs/heads/master
Commit: 76dcbd458817566166022c400c00df972043453e
Parents: e90ea9c
Author: omaralvarez <om...@udc.es>
Authored: Fri Jun 10 14:06:03 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 10 14:07:41 2016 +0200

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinderTest.java  | 11 ++-
 .../org/apache/flink/python/api/test_main.py    | 38 +--------
 .../org/apache/flink/python/api/test_main2.py   | 40 +---------
 .../org/apache/flink/python/api/utils/utils.py  | 81 ++++++++++++++++++++
 4 files changed, 92 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76dcbd45/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 244e6b7..c53d408 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -28,6 +28,12 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 		return true;
 	}
 
+	private static String findUtilsFile() throws Exception {
+		FileSystem fs = FileSystem.getLocalFileSystem();
+		return fs.getWorkingDirectory().toString()
+				+ "/src/test/python/org/apache/flink/python/api/utils/utils.py";
+	}
+
 	private static List<String> findTestFiles() throws Exception {
 		List<String> files = new ArrayList();
 		FileSystem fs = FileSystem.getLocalFileSystem();
@@ -63,14 +69,15 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
+		String utils = findUtilsFile();
 		if (isPython2Supported()) {
 			for (String file : findTestFiles()) {
-				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file});
+				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file, utils});
 			}
 		}
 		if (isPython3Supported()) {
 			for (String file : findTestFiles()) {
-				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file});
+				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file, utils});
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/76dcbd45/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 9b0f144..c0a4414 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -25,43 +25,7 @@ from flink.functions.GroupReduceFunction import GroupReduceFunction
 from flink.plan.Constants import Order, WriteMode
 from flink.plan.Constants import INT, STRING
 import struct
-
-#Utilities
-class Id(MapFunction):
-    def map(self, value):
-        return value
-
-
-class Verify(MapPartitionFunction):
-    def __init__(self, expected, name):
-        super(Verify, self).__init__()
-        self.expected = expected
-        self.name = name
-
-    def map_partition(self, iterator, collector):
-        index = 0
-        for value in iterator:
-            if value != self.expected[index]:
-                raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
-            index += 1
-        #collector.collect(self.name + " successful!")
-
-
-class Verify2(MapPartitionFunction):
-    def __init__(self, expected, name):
-        super(Verify2, self).__init__()
-        self.expected = expected
-        self.name = name
-
-    def map_partition(self, iterator, collector):
-        for value in iterator:
-            if value in self.expected:
-                try:
-                    self.expected.remove(value)
-                except Exception:
-                    raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected))
-        #collector.collect(self.name + " successful!")
-
+from utils import Id, Verify
 
 if __name__ == "__main__":
     env = get_environment()

http://git-wip-us.apache.org/repos/asf/flink/blob/76dcbd45/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index 787928c..2ea6f91 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -18,48 +18,10 @@
 ################################################################################
 from flink.plan.Environment import get_environment
 from flink.functions.MapFunction import MapFunction
-from flink.functions.MapPartitionFunction import MapPartitionFunction
 from flink.functions.CrossFunction import CrossFunction
 from flink.functions.JoinFunction import JoinFunction
 from flink.functions.CoGroupFunction import CoGroupFunction
-
-
-#Utilities
-class Id(MapFunction):
-    def map(self, value):
-        return value
-
-
-class Verify(MapPartitionFunction):
-    def __init__(self, expected, name):
-        super(Verify, self).__init__()
-        self.expected = expected
-        self.name = name
-
-    def map_partition(self, iterator, collector):
-        index = 0
-        for value in iterator:
-            if value != self.expected[index]:
-                raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
-            index += 1
-        #collector.collect(self.name + " successful!")
-
-
-class Verify2(MapPartitionFunction):
-    def __init__(self, expected, name):
-        super(Verify2, self).__init__()
-        self.expected = expected
-        self.name = name
-
-    def map_partition(self, iterator, collector):
-        for value in iterator:
-            if value in self.expected:
-                try:
-                    self.expected.remove(value)
-                except Exception:
-                    raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected))
-        #collector.collect(self.name + " successful!")
-
+from utils import Verify, Verify2
 
 if __name__ == "__main__":
     env = get_environment()

http://git-wip-us.apache.org/repos/asf/flink/blob/76dcbd45/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py
new file mode 100644
index 0000000..78999b1
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py
@@ -0,0 +1,81 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.functions.MapFunction import MapFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+
+
+class Id(MapFunction):
+    def map(self, value):
+        """
+        Simple map function to forward test results.
+
+        :param value: Input value.
+        :return: Forwarded value.
+        """
+        return value
+
+
+class Verify(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        """
+        Compares elements in the expected values list against actual values in resulting DataSet.
+
+        :param iterator: Iterator for the corresponding DataSet partition.
+        :param collector: Collector for the result records.
+        """
+        index = 0
+        for value in iterator:
+            try:
+                if value != self.expected[index]:
+                    raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
+            except IndexError:
+                raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.")
+            index += 1
+        if len(self.expected) != index:
+            raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.")
+        #collector.collect(self.name + " successful!")
+
+
+class Verify2(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify2, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        """
+        Compares elements in the expected values list against actual values in resulting DataSet.
+
+        This function does not compare element by element, since for example in a Union order is not guaranteed.
+
+        Elements are removed from the expected values list for the whole DataSet.
+
+        :param iterator: Iterator for the corresponding DataSet partition.
+        :param collector: Collector for the result records.
+        """
+        for value in iterator:
+            try:
+                self.expected.remove(value)
+            except Exception:
+                raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: " + str(self.expected))
+        #collector.collect(self.name + " successful!")