You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/08 05:29:13 UTC

[flink] branch master updated: [FLINK-12326][python] Add basic test framework for python table api.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 84eec21  [FLINK-12326][python] Add basic test framework for python table api.
84eec21 is described below

commit 84eec21108f2c05fa872c9a3735457d73f75dc51
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Mon May 6 10:05:19 2019 +0800

    [FLINK-12326][python] Add basic test framework for python table api.
    
    This closes #8347
---
 .../main/flink-bin/bin/pyflink-gateway-server.sh   |  17 ++-
 flink-python/pyflink/find_flink_home.py            |  19 ++-
 flink-python/pyflink/table/tests/test_calc.py      |  70 +++++++++++
 .../pyflink/table/tests/test_end_to_end.py         |  73 ------------
 .../pyflink/testing/__init__.py                    |  39 +-----
 flink-python/pyflink/testing/source_sink_utils.py  | 131 +++++++++++++++++++++
 flink-python/pyflink/testing/test_case_utils.py    |  92 +++++++++++++++
 .../runtime/stream/table/TableSinkITCase.scala     |  23 ++--
 8 files changed, 338 insertions(+), 126 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
index b63a2ac..026f813 100644
--- a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
+++ b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
@@ -51,4 +51,19 @@ log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-python-$HOSTNAME.log
 log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
 
 TABLE_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-table*.jar`
-exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH} ${DRIVER} ${ARGS[@]}
+
+FLINK_TEST_CLASSPATH=""
+if [[ -n "$FLINK_TESTING" ]]; then
+  bin=`dirname "$0"`
+  FLINK_SOURCE_ROOT_DIR=`cd "$bin/../../"; pwd -P`
+
+  while read -d '' -r testJarFile ; do
+    if [[ "$FLINK_TEST_CLASSPATH" == "" ]]; then
+      FLINK_TEST_CLASSPATH="$testJarFile";
+    else
+      FLINK_TEST_CLASSPATH="$FLINK_TEST_CLASSPATH":"$testJarFile"
+    fi
+  done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d -name 'flink-*-tests.jar' -print0 | sort -z)
+fi
+
+exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${FLINK_TEST_CLASSPATH} ${DRIVER} ${ARGS[@]}
diff --git a/flink-python/pyflink/find_flink_home.py b/flink-python/pyflink/find_flink_home.py
index b84a7c3..0491368 100644
--- a/flink-python/pyflink/find_flink_home.py
+++ b/flink-python/pyflink/find_flink_home.py
@@ -15,7 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from __future__ import print_function
+
+import logging
 import os
 import sys
 
@@ -37,10 +38,22 @@ def _find_flink_home():
                 return build_target
         except Exception:
             pass
-        print("Could not find valid FLINK_HOME(Flink distribution directory) "
-              "in current environment.", file=sys.stderr)
+        logging.error("Could not find valid FLINK_HOME(Flink distribution directory) "
+                      "in current environment.")
         sys.exit(-1)
 
 
+def _find_flink_source_root():
+    """
+    Find the flink source root directory.
+    """
+    try:
+        return os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
+    except Exception:
+        pass
+    logging.error("Could not find valid flink source root directory in current environment.")
+    sys.exit(-1)
+
+
 if __name__ == "__main__":
     print(_find_flink_home())
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
new file mode 100644
index 0000000..70829dc
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -0,0 +1,70 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class TableTests(PyFlinkStreamTableTestCase):
+
+    def test_select(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        with open(source_path, 'w') as f:
+            lines = '1,hi,hello\n' + '2,hi,hello\n'
+            f.write(lines)
+            f.close()
+
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+
+        t_env = self.t_env
+
+        # register Orders table in table environment
+        t_env.register_table_source(
+            "Orders",
+            CsvTableSource(source_path, field_names, field_types))
+
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        t_env.scan("Orders") \
+             .where("a > 0") \
+             .select("a + 1, b, c") \
+             .insert_into("Results")
+
+        t_env.execute()
+
+        actual = source_sink_utils.results()
+        expected = ['2,hi,hello', '3,hi,hello']
+        self.assert_equals(actual, expected)
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_end_to_end.py b/flink-python/pyflink/table/tests/test_end_to_end.py
deleted file mode 100644
index 7edd0b0..0000000
--- a/flink-python/pyflink/table/tests/test_end_to_end.py
+++ /dev/null
@@ -1,73 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import os
-import tempfile
-
-from pyflink.find_flink_home import _find_flink_home
-from pyflink.table import TableEnvironment, TableConfig
-from pyflink.table.table_sink import CsvTableSink
-from pyflink.table.table_source import CsvTableSource
-from pyflink.table.types import DataTypes
-
-
-def test_end_to_end():
-    tmp_dir = tempfile.gettempdir()
-    source_path = tmp_dir + '/streaming.csv'
-    if os.path.isfile(source_path):
-        os.remove(source_path)
-    with open(source_path, 'w') as f:
-        lines = '1,hi,hello\n' + '2,hi,hello\n'
-        f.write(lines)
-        f.close()
-    _find_flink_home()
-    print("using %s as FLINK_HOME..." % os.environ["FLINK_HOME"])
-
-    t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
-    t_env = TableEnvironment.get_table_environment(t_config)
-
-    field_names = ["a", "b", "c"]
-    field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
-
-    # register Orders table in table environment
-    t_env.register_table_source(
-        "Orders",
-        CsvTableSource(source_path, field_names, field_types))
-
-    # register Results table in table environment
-    tmp_dir = tempfile.gettempdir()
-    tmp_csv = tmp_dir + '/streaming2.csv'
-    if os.path.isfile(tmp_csv):
-        os.remove(tmp_csv)
-
-    t_env.register_table_sink(
-        "Results",
-        field_names, field_types, CsvTableSink(tmp_csv))
-
-    t_env.scan("Orders") \
-         .where("a > 0") \
-         .select("a + 1, b, c") \
-         .insert_into("Results")
-
-    t_env.execute()
-    with open(tmp_csv, 'r') as f:
-        lines = f.read()
-        assert lines == '2,hi,hello\n' + '3,hi,hello\n'
-    print("test passed, the log file is under this directory: %s/log" % os.environ["FLINK_HOME"])
-
-if __name__ == '__main__':
-    test_end_to_end()
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh b/flink-python/pyflink/testing/__init__.py
similarity index 50%
copy from flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
copy to flink-python/pyflink/testing/__init__.py
index b63a2ac..e154fad 100644
--- a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
+++ b/flink-python/pyflink/testing/__init__.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 ################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -15,40 +14,4 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 # limitations under the License.
-################################################################################
-
-# =====================================================================
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/config.sh
-
-if [ "$FLINK_IDENT_STRING" = "" ]; then
-        FLINK_IDENT_STRING="$USER"
-fi
-
-FLINK_CLASSPATH=`constructFlinkClassPath`
-
-ARGS=()
-
-while [[ $# -gt 0 ]]
-do
-    key="$1"
-    case $key in
-        -c|--class)
-            DRIVER=$2
-            shift
-            shift
-            ;;
-        *)
-           ARGS+=("$1")
-           shift
-           ;;
-    esac
-done
-
-log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-python-$HOSTNAME.log
-log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
-
-TABLE_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-table*.jar`
-exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH} ${DRIVER} ${ARGS[@]}
+#################################################################################
diff --git a/flink-python/pyflink/testing/source_sink_utils.py b/flink-python/pyflink/testing/source_sink_utils.py
new file mode 100644
index 0000000..b9f28d8
--- /dev/null
+++ b/flink-python/pyflink/testing/source_sink_utils.py
@@ -0,0 +1,131 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import glob
+import os
+import sys
+import unittest
+from py4j.java_gateway import java_import
+
+from pyflink.find_flink_home import _find_flink_source_root
+from pyflink.java_gateway import get_gateway
+from pyflink.table import TableSink
+
+if sys.version_info[0] >= 3:
+    xrange = range
+
+
+class TestTableSink(TableSink):
+    """
+    Base class for test table sink.
+    """
+
+    _inited = False
+
+    def __init__(self, j_table_sink):
+        super(TestTableSink, self).__init__(j_table_sink)
+
+    @classmethod
+    def _ensure_initialized(cls):
+        if TestTableSink._inited:
+            return
+
+        FLINK_SOURCE_ROOT_DIR = _find_flink_source_root()
+        filename_pattern = (
+            "flink-table/flink-table-planner/target/"
+            "flink-table-planner*-tests.jar")
+        if not glob.glob(os.path.join(FLINK_SOURCE_ROOT_DIR, filename_pattern)):
+            raise unittest.SkipTest(
+                "'flink-table-planner*-tests.jar' is not available. Will skip the related tests.")
+
+        gateway = get_gateway()
+        java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestAppendSink")
+        java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestRetractSink")
+        java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestUpsertSink")
+        java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.RowCollector")
+
+        TestTableSink._inited = True
+
+
+class TestAppendSink(TestTableSink):
+    """
+    A test append table sink.
+    """
+
+    def __init__(self):
+        TestTableSink._ensure_initialized()
+
+        gateway = get_gateway()
+        super(TestAppendSink, self).__init__(gateway.jvm.TestAppendSink())
+
+
+class TestRetractSink(TestTableSink):
+    """
+    A test retract table sink.
+    """
+
+    def __init__(self):
+        TestTableSink._ensure_initialized()
+
+        gateway = get_gateway()
+        super(TestRetractSink, self).__init__(gateway.jvm.TestRetractSink())
+
+
+class TestUpsertSink(TestTableSink):
+    """
+    A test upsert table sink.
+    """
+
+    def __init__(self, keys, is_append_only):
+        TestTableSink._ensure_initialized()
+
+        gateway = get_gateway()
+        j_keys = gateway.new_array(gateway.jvm.String, len(keys))
+        for i in xrange(0, len(keys)):
+            j_keys[i] = keys[i]
+
+        super(TestUpsertSink, self).__init__(gateway.jvm.TestUpsertSink(j_keys, is_append_only))
+
+
+def results():
+    """
+    Retrieves the results from an append table sink.
+    """
+    return retract_results()
+
+
+def retract_results():
+    """
+    Retrieves the results from a retract table sink.
+    """
+    gateway = get_gateway()
+    results = gateway.jvm.RowCollector.getAndClearValues()
+    return gateway.jvm.RowCollector.retractResults(results)
+
+
+def upsert_results(keys):
+    """
+    Retrieves the results from an upsert table sink.
+    """
+    gateway = get_gateway()
+    j_keys = gateway.new_array(gateway.jvm.int, len(keys))
+    for i in xrange(0, len(keys)):
+        j_keys[i] = keys[i]
+
+    results = gateway.jvm.RowCollector.getAndClearValues()
+    return gateway.jvm.RowCollector.upsertResults(results, j_keys)
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
new file mode 100644
index 0000000..d04df91
--- /dev/null
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -0,0 +1,92 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import logging
+import os
+import shutil
+import sys
+import tempfile
+import unittest
+
+from pyflink.find_flink_home import _find_flink_home
+from pyflink.table import TableEnvironment, TableConfig
+
+if sys.version_info[0] >= 3:
+    xrange = range
+
+if os.getenv("VERBOSE"):
+    log_level = logging.DEBUG
+else:
+    log_level = logging.INFO
+logging.basicConfig(stream=sys.stdout, level=log_level,
+                    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+
+
+class PyFlinkTestCase(unittest.TestCase):
+    """
+    Base class for unit tests.
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+
+        os.environ["FLINK_TESTING"] = "1"
+        _find_flink_home()
+
+        logging.info("Using %s as FLINK_HOME...", os.environ["FLINK_HOME"])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    @classmethod
+    def assert_equals(cls, actual, expected):
+        actual_py_list = cls.to_py_list(actual)
+        actual_py_list.sort()
+        expected.sort()
+        assert all(x == y for x, y in zip(actual_py_list, expected))
+
+    @classmethod
+    def to_py_list(cls, actual):
+        py_list = []
+        for i in xrange(0, actual.length()):
+            py_list.append(actual.apply(i))
+        return py_list
+
+
+class PyFlinkStreamTableTestCase(PyFlinkTestCase):
+    """
+    Base class for stream unit tests.
+    """
+
+    def setUp(self):
+        super(PyFlinkStreamTableTestCase, self).setUp()
+        self.t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(4).build()
+        self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PyFlinkBatchTableTestCase(PyFlinkTestCase):
+    """
+    Base class for batch unit tests.
+    """
+
+    def setUp(self):
+        super(PyFlinkBatchTableTestCase, self).setUp()
+        self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
+        self.t_env = TableEnvironment.get_table_environment(self.t_config)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index c5c23cb..dbfdb14 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -680,18 +680,19 @@ private[flink] class TestUpsertSink(
 
   override def setKeyFields(keys: Array[String]): Unit =
     if (keys != null) {
-      assertEquals("Provided key fields do not match expected keys",
-        expectedKeys.sorted.mkString(","),
-        keys.sorted.mkString(","))
+      if (!expectedKeys.sorted.mkString(",").equals(keys.sorted.mkString(","))) {
+        throw new AssertionError("Provided key fields do not match expected keys")
+      }
     } else {
-      assertNull("Provided key fields should not be null.", expectedKeys)
+      if (expectedKeys != null) {
+        throw new AssertionError("Provided key fields should not be null.")
+      }
     }
 
   override def setIsAppendOnly(isAppendOnly: JBool): Unit =
-    assertEquals(
-      "Provided isAppendOnly does not match expected isAppendOnly",
-      expectedIsAppendOnly,
-      isAppendOnly)
+    if (expectedIsAppendOnly != isAppendOnly) {
+      throw new AssertionError("Provided isAppendOnly does not match expected isAppendOnly")
+    }
 
   override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
 
@@ -749,9 +750,9 @@ object RowCollector {
         }
       }.filter{ case (_, c: Int) => c != 0 }
 
-    assertFalse(
-      "Received retracted rows which have not been accumulated.",
-      retracted.exists{ case (_, c: Int) => c < 0})
+    if (retracted.exists{ case (_, c: Int) => c < 0}) {
+      throw new AssertionError("Received retracted rows which have not been accumulated.")
+    }
 
     retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList
   }