You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/08 05:29:13 UTC
[flink] branch master updated: [FLINK-12326][python] Add basic test
framework for python table api.
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 84eec21 [FLINK-12326][python] Add basic test framework for python table api.
84eec21 is described below
commit 84eec21108f2c05fa872c9a3735457d73f75dc51
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Mon May 6 10:05:19 2019 +0800
[FLINK-12326][python] Add basic test framework for python table api.
This closes #8347
---
.../main/flink-bin/bin/pyflink-gateway-server.sh | 17 ++-
flink-python/pyflink/find_flink_home.py | 19 ++-
flink-python/pyflink/table/tests/test_calc.py | 70 +++++++++++
.../pyflink/table/tests/test_end_to_end.py | 73 ------------
.../pyflink/testing/__init__.py | 39 +-----
flink-python/pyflink/testing/source_sink_utils.py | 131 +++++++++++++++++++++
flink-python/pyflink/testing/test_case_utils.py | 92 +++++++++++++++
.../runtime/stream/table/TableSinkITCase.scala | 23 ++--
8 files changed, 338 insertions(+), 126 deletions(-)
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
index b63a2ac..026f813 100644
--- a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
+++ b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
@@ -51,4 +51,19 @@ log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-python-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
TABLE_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-table*.jar`
-exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH} ${DRIVER} ${ARGS[@]}
+
+FLINK_TEST_CLASSPATH=""
+if [[ -n "$FLINK_TESTING" ]]; then
+ bin=`dirname "$0"`
+ FLINK_SOURCE_ROOT_DIR=`cd "$bin/../../"; pwd -P`
+
+ while read -d '' -r testJarFile ; do
+ if [[ "$FLINK_TEST_CLASSPATH" == "" ]]; then
+ FLINK_TEST_CLASSPATH="$testJarFile";
+ else
+ FLINK_TEST_CLASSPATH="$FLINK_TEST_CLASSPATH":"$testJarFile"
+ fi
+ done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d -name 'flink-*-tests.jar' -print0 | sort -z)
+fi
+
+exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${FLINK_TEST_CLASSPATH} ${DRIVER} ${ARGS[@]}
diff --git a/flink-python/pyflink/find_flink_home.py b/flink-python/pyflink/find_flink_home.py
index b84a7c3..0491368 100644
--- a/flink-python/pyflink/find_flink_home.py
+++ b/flink-python/pyflink/find_flink_home.py
@@ -15,7 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from __future__ import print_function
+
+import logging
import os
import sys
@@ -37,10 +38,22 @@ def _find_flink_home():
return build_target
except Exception:
pass
- print("Could not find valid FLINK_HOME(Flink distribution directory) "
- "in current environment.", file=sys.stderr)
+ logging.error("Could not find valid FLINK_HOME(Flink distribution directory) "
+ "in current environment.")
sys.exit(-1)
+def _find_flink_source_root():
+ """
+ Find the flink source root directory.
+ """
+ try:
+ return os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
+ except Exception:
+ pass
+ logging.error("Could not find valid flink source root directory in current environment.")
+ sys.exit(-1)
+
+
if __name__ == "__main__":
print(_find_flink_home())
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
new file mode 100644
index 0000000..70829dc
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -0,0 +1,70 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class TableTests(PyFlinkStreamTableTestCase):
+
+ def test_select(self):
+ source_path = os.path.join(self.tempdir + '/streaming.csv')
+ with open(source_path, 'w') as f:
+ lines = '1,hi,hello\n' + '2,hi,hello\n'
+ f.write(lines)
+ f.close()
+
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+
+ t_env = self.t_env
+
+ # register Orders table in table environment
+ t_env.register_table_source(
+ "Orders",
+ CsvTableSource(source_path, field_names, field_types))
+
+ t_env.register_table_sink(
+ "Results",
+ field_names, field_types, source_sink_utils.TestAppendSink())
+
+ t_env.scan("Orders") \
+ .where("a > 0") \
+ .select("a + 1, b, c") \
+ .insert_into("Results")
+
+ t_env.execute()
+
+ actual = source_sink_utils.results()
+ expected = ['2,hi,hello', '3,hi,hello']
+ self.assert_equals(actual, expected)
+
+
+if __name__ == '__main__':
+ import unittest
+
+ try:
+ import xmlrunner
+ testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_end_to_end.py b/flink-python/pyflink/table/tests/test_end_to_end.py
deleted file mode 100644
index 7edd0b0..0000000
--- a/flink-python/pyflink/table/tests/test_end_to_end.py
+++ /dev/null
@@ -1,73 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import os
-import tempfile
-
-from pyflink.find_flink_home import _find_flink_home
-from pyflink.table import TableEnvironment, TableConfig
-from pyflink.table.table_sink import CsvTableSink
-from pyflink.table.table_source import CsvTableSource
-from pyflink.table.types import DataTypes
-
-
-def test_end_to_end():
- tmp_dir = tempfile.gettempdir()
- source_path = tmp_dir + '/streaming.csv'
- if os.path.isfile(source_path):
- os.remove(source_path)
- with open(source_path, 'w') as f:
- lines = '1,hi,hello\n' + '2,hi,hello\n'
- f.write(lines)
- f.close()
- _find_flink_home()
- print("using %s as FLINK_HOME..." % os.environ["FLINK_HOME"])
-
- t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
- t_env = TableEnvironment.get_table_environment(t_config)
-
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
-
- # register Orders table in table environment
- t_env.register_table_source(
- "Orders",
- CsvTableSource(source_path, field_names, field_types))
-
- # register Results table in table environment
- tmp_dir = tempfile.gettempdir()
- tmp_csv = tmp_dir + '/streaming2.csv'
- if os.path.isfile(tmp_csv):
- os.remove(tmp_csv)
-
- t_env.register_table_sink(
- "Results",
- field_names, field_types, CsvTableSink(tmp_csv))
-
- t_env.scan("Orders") \
- .where("a > 0") \
- .select("a + 1, b, c") \
- .insert_into("Results")
-
- t_env.execute()
- with open(tmp_csv, 'r') as f:
- lines = f.read()
- assert lines == '2,hi,hello\n' + '3,hi,hello\n'
- print("test passed, the log file is under this directory: %s/log" % os.environ["FLINK_HOME"])
-
-if __name__ == '__main__':
- test_end_to_end()
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh b/flink-python/pyflink/testing/__init__.py
similarity index 50%
copy from flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
copy to flink-python/pyflink/testing/__init__.py
index b63a2ac..e154fad 100644
--- a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
+++ b/flink-python/pyflink/testing/__init__.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -15,40 +14,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-################################################################################
-
-# =====================================================================
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/config.sh
-
-if [ "$FLINK_IDENT_STRING" = "" ]; then
- FLINK_IDENT_STRING="$USER"
-fi
-
-FLINK_CLASSPATH=`constructFlinkClassPath`
-
-ARGS=()
-
-while [[ $# -gt 0 ]]
-do
- key="$1"
- case $key in
- -c|--class)
- DRIVER=$2
- shift
- shift
- ;;
- *)
- ARGS+=("$1")
- shift
- ;;
- esac
-done
-
-log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-python-$HOSTNAME.log
-log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
-
-TABLE_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-table*.jar`
-exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH} ${DRIVER} ${ARGS[@]}
+#################################################################################
diff --git a/flink-python/pyflink/testing/source_sink_utils.py b/flink-python/pyflink/testing/source_sink_utils.py
new file mode 100644
index 0000000..b9f28d8
--- /dev/null
+++ b/flink-python/pyflink/testing/source_sink_utils.py
@@ -0,0 +1,131 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import glob
+import os
+import sys
+import unittest
+from py4j.java_gateway import java_import
+
+from pyflink.find_flink_home import _find_flink_source_root
+from pyflink.java_gateway import get_gateway
+from pyflink.table import TableSink
+
+if sys.version_info[0] >= 3:
+ xrange = range
+
+
+class TestTableSink(TableSink):
+ """
+ Base class for test table sink.
+ """
+
+ _inited = False
+
+ def __init__(self, j_table_sink):
+ super(TestTableSink, self).__init__(j_table_sink)
+
+ @classmethod
+ def _ensure_initialized(cls):
+ if TestTableSink._inited:
+ return
+
+ FLINK_SOURCE_ROOT_DIR = _find_flink_source_root()
+ filename_pattern = (
+ "flink-table/flink-table-planner/target/"
+ "flink-table-planner*-tests.jar")
+ if not glob.glob(os.path.join(FLINK_SOURCE_ROOT_DIR, filename_pattern)):
+ raise unittest.SkipTest(
+ "'flink-table-planner*-tests.jar' is not available. Will skip the related tests.")
+
+ gateway = get_gateway()
+ java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestAppendSink")
+ java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestRetractSink")
+ java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestUpsertSink")
+ java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.RowCollector")
+
+ TestTableSink._inited = True
+
+
+class TestAppendSink(TestTableSink):
+ """
+ A test append table sink.
+ """
+
+ def __init__(self):
+ TestTableSink._ensure_initialized()
+
+ gateway = get_gateway()
+ super(TestAppendSink, self).__init__(gateway.jvm.TestAppendSink())
+
+
+class TestRetractSink(TestTableSink):
+ """
+ A test retract table sink.
+ """
+
+ def __init__(self):
+ TestTableSink._ensure_initialized()
+
+ gateway = get_gateway()
+ super(TestRetractSink, self).__init__(gateway.jvm.TestRetractSink())
+
+
+class TestUpsertSink(TestTableSink):
+ """
+ A test upsert table sink.
+ """
+
+ def __init__(self, keys, is_append_only):
+ TestTableSink._ensure_initialized()
+
+ gateway = get_gateway()
+ j_keys = gateway.new_array(gateway.jvm.String, len(keys))
+ for i in xrange(0, len(keys)):
+ j_keys[i] = keys[i]
+
+ super(TestUpsertSink, self).__init__(gateway.jvm.TestUpsertSink(j_keys, is_append_only))
+
+
+def results():
+ """
+ Retrieves the results from an append table sink.
+ """
+ return retract_results()
+
+
+def retract_results():
+ """
+ Retrieves the results from a retract table sink.
+ """
+ gateway = get_gateway()
+ results = gateway.jvm.RowCollector.getAndClearValues()
+ return gateway.jvm.RowCollector.retractResults(results)
+
+
+def upsert_results(keys):
+ """
+ Retrieves the results from an upsert table sink.
+ """
+ gateway = get_gateway()
+ j_keys = gateway.new_array(gateway.jvm.int, len(keys))
+ for i in xrange(0, len(keys)):
+ j_keys[i] = keys[i]
+
+ results = gateway.jvm.RowCollector.getAndClearValues()
+ return gateway.jvm.RowCollector.upsertResults(results, j_keys)
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
new file mode 100644
index 0000000..d04df91
--- /dev/null
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -0,0 +1,92 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import logging
+import os
+import shutil
+import sys
+import tempfile
+import unittest
+
+from pyflink.find_flink_home import _find_flink_home
+from pyflink.table import TableEnvironment, TableConfig
+
+if sys.version_info[0] >= 3:
+ xrange = range
+
+if os.getenv("VERBOSE"):
+ log_level = logging.DEBUG
+else:
+ log_level = logging.INFO
+logging.basicConfig(stream=sys.stdout, level=log_level,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+
+
+class PyFlinkTestCase(unittest.TestCase):
+ """
+ Base class for unit tests.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+
+ os.environ["FLINK_TESTING"] = "1"
+ _find_flink_home()
+
+ logging.info("Using %s as FLINK_HOME...", os.environ["FLINK_HOME"])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ @classmethod
+ def assert_equals(cls, actual, expected):
+ actual_py_list = cls.to_py_list(actual)
+ actual_py_list.sort()
+ expected.sort()
+ assert all(x == y for x, y in zip(actual_py_list, expected))
+
+ @classmethod
+ def to_py_list(cls, actual):
+ py_list = []
+ for i in xrange(0, actual.length()):
+ py_list.append(actual.apply(i))
+ return py_list
+
+
+class PyFlinkStreamTableTestCase(PyFlinkTestCase):
+ """
+ Base class for stream unit tests.
+ """
+
+ def setUp(self):
+ super(PyFlinkStreamTableTestCase, self).setUp()
+ self.t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(4).build()
+ self.t_env = TableEnvironment.get_table_environment(self.t_config)
+
+
+class PyFlinkBatchTableTestCase(PyFlinkTestCase):
+ """
+ Base class for batch unit tests.
+ """
+
+ def setUp(self):
+ super(PyFlinkBatchTableTestCase, self).setUp()
+ self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
+ self.t_env = TableEnvironment.get_table_environment(self.t_config)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index c5c23cb..dbfdb14 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -680,18 +680,19 @@ private[flink] class TestUpsertSink(
override def setKeyFields(keys: Array[String]): Unit =
if (keys != null) {
- assertEquals("Provided key fields do not match expected keys",
- expectedKeys.sorted.mkString(","),
- keys.sorted.mkString(","))
+ if (!expectedKeys.sorted.mkString(",").equals(keys.sorted.mkString(","))) {
+ throw new AssertionError("Provided key fields do not match expected keys")
+ }
} else {
- assertNull("Provided key fields should not be null.", expectedKeys)
+ if (expectedKeys != null) {
+ throw new AssertionError("Provided key fields should not be null.")
+ }
}
override def setIsAppendOnly(isAppendOnly: JBool): Unit =
- assertEquals(
- "Provided isAppendOnly does not match expected isAppendOnly",
- expectedIsAppendOnly,
- isAppendOnly)
+ if (expectedIsAppendOnly != isAppendOnly) {
+ throw new AssertionError("Provided isAppendOnly does not match expected isAppendOnly")
+ }
override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
@@ -749,9 +750,9 @@ object RowCollector {
}
}.filter{ case (_, c: Int) => c != 0 }
- assertFalse(
- "Received retracted rows which have not been accumulated.",
- retracted.exists{ case (_, c: Int) => c < 0})
+ if (retracted.exists{ case (_, c: Int) => c < 0}) {
+ throw new AssertionError("Received retracted rows which have not been accumulated.")
+ }
retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList
}