You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/22 10:21:38 UTC
[flink] branch master updated: [FLINK-12439][python] Add FileSystem
Connector with CSV format support in Python Table API
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e16fa9f [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API
e16fa9f is described below
commit e16fa9fe1506ec725f5c5abb2b24afa246e17dae
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Tue May 21 16:25:16 2019 +0800
[FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API
Brief change log:
- Add all of the existing descriptor interfaces align Java Table API.
- Add FileSystem connector and OldCsv format support.
- The `schema(..)` of OldCsv will be added in FLINK-12588.
This closes #8488
---
flink-python/pyflink/java_gateway.py | 1 +
flink-python/pyflink/table/__init__.py | 5 +
flink-python/pyflink/table/table_descriptor.py | 510 +++++++++++++++++
flink-python/pyflink/table/table_environment.py | 91 +++
.../pyflink/table/tests/test_descriptor.py | 618 +++++++++++++++++++++
.../table/tests/test_environment_completeness.py | 2 +-
6 files changed, 1226 insertions(+), 1 deletion(-)
diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index ed1dc89..e5c8330 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -101,6 +101,7 @@ def launch_gateway():
java_import(gateway.jvm, "org.apache.flink.table.api.*")
java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
+ java_import(gateway.jvm, "org.apache.flink.table.descriptors.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.TypeInformation")
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index dcbc0ab..4ea3b3f 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -40,6 +40,7 @@ from pyflink.table.table_sink import TableSink, CsvTableSink
from pyflink.table.table_source import TableSource, CsvTableSource
from pyflink.table.types import DataTypes
from pyflink.table.window import Tumble, Session, Slide, Over
+from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem
__all__ = [
'TableEnvironment',
@@ -56,4 +57,8 @@ __all__ = [
'Session',
'Slide',
'Over',
+ 'Rowtime',
+ 'Schema',
+ 'OldCsv',
+ 'FileSystem',
]
diff --git a/flink-python/pyflink/table/table_descriptor.py b/flink-python/pyflink/table/table_descriptor.py
new file mode 100644
index 0000000..ed9a157
--- /dev/null
+++ b/flink-python/pyflink/table/table_descriptor.py
@@ -0,0 +1,510 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from abc import ABCMeta
+
+from py4j.java_gateway import get_method
+
+from pyflink.java_gateway import get_gateway
+from pyflink.util.type_utils import to_java_type
+
+if sys.version >= '3':
+ unicode = str
+
+__all__ = [
+ 'Rowtime',
+ 'Schema',
+ 'OldCsv',
+ 'FileSystem'
+]
+
+
+class Descriptor(object):
+ """
+ Base class of the descriptors that adds a set of string-based, normalized properties for
+ describing DDL information.
+
+ Typical characteristics of a descriptor are:
+ - descriptors have a default constructor
+ - descriptors themselves contain very little logic
+ - corresponding validators validate the correctness (goal: have a single point of validation)
+
+ A descriptor is similar to a builder in a builder pattern, thus, mutable for building
+ properties.
+ """
+
+ __metaclass__ = ABCMeta
+
+ def __init__(self, j_descriptor):
+ self._j_descriptor = j_descriptor
+
+ def to_properties(self):
+ """
+ Converts this descriptor into a dict of properties.
+
+ :return: Dict object contains all of current properties.
+ """
+ return dict(self._j_descriptor.toProperties())
+
+
+class Rowtime(Descriptor):
+ """
+ Rowtime descriptor for describing an event time attribute in the schema.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_rowtime = gateway.jvm.Rowtime()
+ super(Rowtime, self).__init__(self._j_rowtime)
+
+ def timestamps_from_field(self, field_name):
+ """
+ Sets a built-in timestamp extractor that converts an existing LONG or TIMESTAMP field into
+ the rowtime attribute.
+
+ :param field_name: The field to convert into a rowtime attribute.
+ :return: This rowtime descriptor.
+ """
+ self._j_rowtime = self._j_rowtime.timestampsFromField(field_name)
+ return self
+
+ def timestamps_from_source(self):
+ """
+ Sets a built-in timestamp extractor that converts the assigned timestamps from a DataStream
+ API record into the rowtime attribute and thus preserves the assigned timestamps from the
+ source.
+
+ ..note::
+ This extractor only works in streaming environments.
+
+ :return: This rowtime descriptor.
+ """
+ self._j_rowtime = self._j_rowtime.timestampsFromSource()
+ return self
+
+ def timestamps_from_extractor(self, extractor):
+ """
+ Sets a custom timestamp extractor to be used for the rowtime attribute.
+
+ :param extractor: The java canonical class name of the TimestampExtractor to extract the
+ rowtime attribute from the physical type. The TimestampExtractor must
+ have a public no-argument constructor and can be founded by
+ in current Java classloader.
+ :return: This rowtime descriptor.
+ """
+ gateway = get_gateway()
+ self._j_rowtime = self._j_rowtime.timestampsFromExtractor(
+ gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(extractor)
+ .newInstance())
+ return self
+
+ def watermarks_periodic_ascending(self):
+ """
+ Sets a built-in watermark strategy for ascending rowtime attributes.
+
+ Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a
+ timestamp equal to the max timestamp are not late.
+
+ :return: This rowtime descriptor.
+ """
+ self._j_rowtime = self._j_rowtime.watermarksPeriodicAscending()
+ return self
+
+ def watermarks_periodic_bounded(self, delay):
+ """
+ Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a
+ bounded time interval.
+
+ Emits watermarks which are the maximum observed timestamp minus the specified delay.
+
+ :param delay: Delay in milliseconds.
+ :return: This rowtime descriptor.
+ """
+ self._j_rowtime = self._j_rowtime.watermarksPeriodicBounded(delay)
+ return self
+
+ def watermarks_from_source(self):
+ """
+ Sets a built-in watermark strategy which indicates the watermarks should be preserved from
+ the underlying DataStream API and thus preserves the assigned watermarks from the source.
+
+ :return: This rowtime descriptor.
+ """
+ self._j_rowtime = self._j_rowtime.watermarksFromSource()
+ return self
+
+ def watermarks_from_strategy(self, strategy):
+ """
+ Sets a custom watermark strategy to be used for the rowtime attribute.
+
+ :param strategy: The java canonical class name of the WatermarkStrategy. The
+ WatermarkStrategy must have a public no-argument constructor and can be
+ founded by in current Java classloader.
+ :return: This rowtime descriptor.
+ """
+ gateway = get_gateway()
+ self._j_rowtime = self._j_rowtime.watermarksFromStrategy(
+ gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(strategy)
+ .newInstance())
+ return self
+
+
+class Schema(Descriptor):
+ """
+ Describes a schema of a table.
+
+ ..note::
+ Field names are matched by the exact name by default (case sensitive).
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_schema = gateway.jvm.Schema()
+ super(Schema, self).__init__(self._j_schema)
+
+ def field(self, field_name, field_type):
+ """
+ Adds a field with the field name and the data type or type string. Required.
+ This method can be called multiple times. The call order of this method defines
+ also the order of the fields in a row. Here is a document that introduces the type strings:
+ https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#type-strings
+
+ :param field_name: The field name.
+ :param field_type: The data type or type string of the field.
+ :return: This schema object.
+ """
+ if isinstance(field_type, (str, unicode)):
+ self._j_schema = self._j_schema.field(field_name, field_type)
+ else:
+ self._j_schema = self._j_schema.field(field_name, to_java_type(field_type))
+ return self
+
+ def from_origin_field(self, origin_field_name):
+ """
+ Specifies the origin of the previously defined field. The origin field is defined by a
+ connector or format.
+
+ E.g. field("myString", Types.STRING).from_origin_field("CSV_MY_STRING")
+
+ ..note::
+ Field names are matched by the exact name by default (case sensitive).
+
+ :param origin_field_name: The origin field name.
+ :return: This schema object.
+ """
+ self._j_schema = get_method(self._j_schema, "from")(origin_field_name)
+ return self
+
+ def proctime(self):
+ """
+ Specifies the previously defined field as a processing-time attribute.
+
+ E.g. field("proctime", Types.SQL_TIMESTAMP).proctime()
+
+ :return: This schema object.
+ """
+ self._j_schema = self._j_schema.proctime()
+ return self
+
+ def rowtime(self, rowtime):
+ """
+ Specifies the previously defined field as an event-time attribute.
+
+ E.g. field("rowtime", Types.SQL_TIMESTAMP).rowtime(...)
+
+ :param rowtime: A :class:`RowTime`.
+ :return: This schema object.
+ """
+ self._j_schema = self._j_schema.rowtime(rowtime._j_rowtime)
+ return self
+
+
+class FormatDescriptor(Descriptor):
+ """
+ Describes the format of data.
+ """
+
+ __metaclass__ = ABCMeta
+
+ def __init__(self, j_format_descriptor):
+ self._j_format_descriptor = j_format_descriptor
+ super(FormatDescriptor, self).__init__(self._j_format_descriptor)
+
+
+class OldCsv(FormatDescriptor):
+ """
+ Format descriptor for comma-separated values (CSV).
+
+ ..note::
+ This descriptor describes Flink's non-standard CSV table source/sink. In the future, the
+ descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant `Csv`
+ format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use
+ the old one for stream/batch filesystem operations for now.
+
+ .. note::
+ Deprecated: use the RFC-compliant `Csv` format instead when writing to Kafka.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_csv = gateway.jvm.OldCsv()
+ super(OldCsv, self).__init__(self._j_csv)
+
+ def field_delimiter(self, delimiter):
+ """
+ Sets the field delimiter, "," by default.
+
+ :param delimiter: The field delimiter.
+ :return: This :class:`OldCsv` object.
+ """
+ self._j_csv = self._j_csv.fieldDelimiter(delimiter)
+ return self
+
+ def line_delimiter(self, delimiter):
+ """
+ Sets the line delimiter, "\n" by default.
+
+ :param delimiter: The line delimiter.
+ :return: This :class:`OldCsv` object.
+ """
+ self._j_csv = self._j_csv.lineDelimiter(delimiter)
+ return self
+
+ def field(self, field_name, field_type):
+ """
+ Adds a format field with the field name and the data type or type string. Required.
+ This method can be called multiple times. The call order of this method defines
+ also the order of the fields in the format.
+
+ :param field_name: The field name.
+ :param field_type: The data type or type string of the field.
+ :return: This :class:`OldCsv` object.
+ """
+ if isinstance(field_type, (str, unicode)):
+ self._j_csv = self._j_csv.field(field_name, field_type)
+ else:
+ self._j_csv = self._j_csv.field(field_name, to_java_type(field_type))
+ return self
+
+ def quote_character(self, quote_character):
+ """
+ Sets a quote character for String values, null by default.
+
+ :param quote_character: The quote character.
+ :return: This :class:`OldCsv` object.
+ """
+ self._j_csv = self._j_csv.quoteCharacter(quote_character)
+ return self
+
+ def comment_prefix(self, prefix):
+ """
+ Sets a prefix to indicate comments, null by default.
+
+ :param prefix: The prefix to indicate comments.
+ :return: This :class:`OldCsv` object.
+ """
+ self._j_csv = self._j_csv.commentPrefix(prefix)
+ return self
+
+ def ignore_parse_errors(self):
+ """
+ Skip records with parse error instead to fail. Throw an exception by default.
+
+ :return: This :class:`OldCsv` object.
+ """
+ self._j_csv = self._j_csv.ignoreParseErrors()
+ return self
+
+ def ignore_first_line(self):
+ """
+ Ignore the first line. Not skip the first line by default.
+
+ :return: This :class:`OldCsv` object.
+ """
+ self._j_csv = self._j_csv.ignoreFirstLine()
+ return self
+
+
+class ConnectorDescriptor(Descriptor):
+ """
+ Describes a connector to an other system.
+ """
+
+ __metaclass__ = ABCMeta
+
+ def __init__(self, j_connector_descriptor):
+ self._j_connector_descriptor = j_connector_descriptor
+ super(ConnectorDescriptor, self).__init__(self._j_connector_descriptor)
+
+
+class FileSystem(ConnectorDescriptor):
+ """
+ Connector descriptor for a file system.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_file_system = gateway.jvm.FileSystem()
+ super(FileSystem, self).__init__(self._j_file_system)
+
+ def path(self, path_str):
+ """
+ Sets the path to a file or directory in a file system.
+
+ :param path_str: The path of a file or directory.
+ :return: This :class:`FileSystem` object.
+ """
+ self._j_file_system = self._j_file_system.path(path_str)
+ return self
+
+
+class ConnectTableDescriptor(Descriptor):
+ """
+ Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`.
+ """
+
+ __metaclass__ = ABCMeta
+
+ def __init__(self, j_table_descriptor):
+ self._j_table_descriptor = j_table_descriptor
+ super(ConnectTableDescriptor, self).__init__(self._j_table_descriptor)
+
+ def with_format(self, format_descriptor):
+ """
+ Specifies the format that defines how to read data from a connector.
+
+ :type format_descriptor: The :class:`FormatDescriptor` for the resulting table,
+ e.g. :class:`OldCsv`.
+ :return: This object.
+ """
+ self._j_table_descriptor = \
+ self._j_table_descriptor.withFormat(format_descriptor._j_format_descriptor)
+ return self
+
+ def with_schema(self, schema):
+ """
+ Specifies the resulting table schema.
+
+ :type schema: The :class:`Schema` object for the resulting table.
+ :return: This object.
+ """
+ self._j_table_descriptor = self._j_table_descriptor.withSchema(schema._j_schema)
+ return self
+
+ def register_table_sink(self, name):
+ """
+ Searches for the specified table sink, configures it accordingly, and registers it as
+ a table under the given name.
+
+ :param name: Table name to be registered in the table environment.
+ :return: This object.
+ """
+ self._j_table_descriptor = self._j_table_descriptor.registerTableSink(name)
+ return self
+
+ def register_table_source(self, name):
+ """
+ Searches for the specified table source, configures it accordingly, and registers it as
+ a table under the given name.
+
+ :param name: Table name to be registered in the table environment.
+ :return: This object.
+ """
+ self._j_table_descriptor = self._j_table_descriptor.registerTableSource(name)
+ return self
+
+ def register_table_source_and_sink(self, name):
+ """
+ Searches for the specified table source and sink, configures them accordingly, and
+ registers them as a table under the given name.
+
+ :param name: Table name to be registered in the table environment.
+ :return: This object.
+ """
+ self._j_table_descriptor = self._j_table_descriptor.registerTableSourceAndSink(name)
+ return self
+
+
+class StreamTableDescriptor(ConnectTableDescriptor):
+ """
+ Descriptor for specifying a table source and/or sink in a streaming environment.
+ """
+
+ def __init__(self, j_stream_table_descriptor):
+ self._j_stream_table_descriptor = j_stream_table_descriptor
+ super(StreamTableDescriptor, self).__init__(self._j_stream_table_descriptor)
+
+ def in_append_mode(self):
+ """
+ Declares how to perform the conversion between a dynamic table and an external connector.
+
+ In append mode, a dynamic table and an external connector only exchange INSERT messages.
+
+ :return: This object.
+ """
+ self._j_stream_table_descriptor = self._j_stream_table_descriptor.inAppendMode()
+ return self
+
+ def in_retract_mode(self):
+ """
+ Declares how to perform the conversion between a dynamic table and an external connector.
+
+ In retract mode, a dynamic table and an external connector exchange ADD and RETRACT
+ messages.
+
+ An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
+ UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
+ the updating (new) row.
+
+ In this mode, a key must not be defined as opposed to upsert mode. However, every update
+ consists of two messages which is less efficient.
+
+ :return: This object.
+ """
+ self._j_stream_table_descriptor = self._j_stream_table_descriptor.inRetractMode()
+ return self
+
+ def in_upsert_mode(self):
+ """
+ Declares how to perform the conversion between a dynamic table and an external connector.
+
+ In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE
+ messages.
+
+ This mode requires a (possibly composite) unique key by which updates can be propagated. The
+ external connector needs to be aware of the unique key attribute in order to apply messages
+ correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
+ DELETE messages.
+
+ The main difference to a retract stream is that UPDATE changes are encoded with a single
+ message and are therefore more efficient.
+
+ :return: This object.
+ """
+ self._j_stream_table_descriptor = self._j_stream_table_descriptor.inUpsertMode()
+ return self
+
+
+class BatchTableDescriptor(ConnectTableDescriptor):
+ """
+ Descriptor for specifying a table source and/or sink in a batch environment.
+ """
+
+ def __init__(self, j_batch_table_descriptor):
+ self.j_batch_table_descriptor = j_batch_table_descriptor
+ super(BatchTableDescriptor, self).__init__(self.j_batch_table_descriptor)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 5b9886d..dc77b04 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -20,6 +20,8 @@ from abc import ABCMeta, abstractmethod
from pyflink.table.query_config import StreamQueryConfig, BatchQueryConfig, QueryConfig
from pyflink.table.table_config import TableConfig
+from pyflink.table.table_descriptor import (StreamTableDescriptor, ConnectorDescriptor,
+ BatchTableDescriptor)
from pyflink.java_gateway import get_gateway
from pyflink.table import Table
@@ -207,6 +209,35 @@ class TableEnvironment(object):
def query_config(self):
pass
+ @abstractmethod
+ def connect(self, connector_descriptor):
+ """
+ Creates a table source and/or table sink from a descriptor.
+
+ Descriptors allow for declaring the communication to external systems in an
+ implementation-agnostic way. The classpath is scanned for suitable table factories that
+ match the desired configuration.
+
+ The following example shows how to read from a connector using a JSON format and
+ registering a table source as "MyTable":
+ ::
+ >>> table_env\
+ ... .connect(ExternalSystemXYZ()
+ ... .version("0.11"))\
+ ... .with_format(Json()
+ ... .json_schema("{...}")
+ ... .fail_on_missing_field(False))\
+ ... .with_schema(Schema()
+ ... .field("user-name", "VARCHAR")
+ ... .from_origin_field("u_name")
+ ... .field("count", "DECIMAL"))\
+ ... .register_table_source("MyTable")
+
+ :param connector_descriptor: Connector descriptor describing the external system.
+ :return: A :class:`ConnectTableDescriptor` used to build the table source/sink.
+ """
+ pass
+
@classmethod
def create(cls, table_config):
"""
@@ -261,6 +292,36 @@ class StreamTableEnvironment(TableEnvironment):
"""
return StreamQueryConfig(self._j_tenv.queryConfig())
+ def connect(self, connector_descriptor):
+ """
+ Creates a table source and/or table sink from a descriptor.
+
+ Descriptors allow for declaring the communication to external systems in an
+ implementation-agnostic way. The classpath is scanned for suitable table factories that
+ match the desired configuration.
+
+ The following example shows how to read from a connector using a JSON format and
+ registering a table source as "MyTable":
+ ::
+ >>> table_env\
+ ... .connect(ExternalSystemXYZ()
+ ... .version("0.11"))\
+ ... .with_format(Json()
+ ... .json_schema("{...}")
+ ... .fail_on_missing_field(False))\
+ ... .with_schema(Schema()
+ ... .field("user-name", "VARCHAR")
+ ... .from_origin_field("u_name")
+ ... .field("count", "DECIMAL"))\
+ ... .register_table_source("MyTable")
+
+ :param connector_descriptor: Connector descriptor describing the external system.
+ :return: A :class:`StreamTableDescriptor` used to build the table source/sink.
+ """
+ # type: (ConnectorDescriptor) -> StreamTableDescriptor
+ return StreamTableDescriptor(
+ self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
+
class BatchTableEnvironment(TableEnvironment):
@@ -288,3 +349,33 @@ class BatchTableEnvironment(TableEnvironment):
:return: A new :class:`BatchQueryConfig`.
"""
return BatchQueryConfig(self._j_tenv.queryConfig())
+
+ def connect(self, connector_descriptor):
+ """
+ Creates a table source and/or table sink from a descriptor.
+
+ Descriptors allow for declaring the communication to external systems in an
+ implementation-agnostic way. The classpath is scanned for suitable table factories that
+ match the desired configuration.
+
+ The following example shows how to read from a connector using a JSON format and
+ registering a table source as "MyTable":
+ ::
+ >>> table_env\
+ ... .connect(ExternalSystemXYZ()
+ ... .version("0.11"))\
+ ... .with_format(Json()
+ ... .json_schema("{...}")
+ ... .fail_on_missing_field(False))\
+ ... .with_schema(Schema()
+ ... .field("user-name", "VARCHAR")
+ ... .from_origin_field("u_name")
+ ... .field("count", "DECIMAL"))\
+ ... .register_table_source("MyTable")
+
+ :param connector_descriptor: Connector descriptor describing the external system.
+ :return: A :class:`BatchTableDescriptor` used to build the table source/sink.
+ """
+ # type: (ConnectorDescriptor) -> BatchTableDescriptor
+ return BatchTableDescriptor(
+ self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
new file mode 100644
index 0000000..f2c2976
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -0,0 +1,618 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema)
+from pyflink.table.table_sink import CsvTableSink
+from pyflink.table.types import DataTypes
+from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
+ PyFlinkBatchTableTestCase)
+
+
+class FileSystemDescriptorTests(PyFlinkTestCase):
+
+ def test_path(self):
+ file_system = FileSystem()
+
+ file_system.path("/test.csv")
+
+ properties = file_system.to_properties()
+ expected = {'connector.property-version': '1',
+ 'connector.type': 'filesystem',
+ 'connector.path': '/test.csv'}
+ assert properties == expected
+
+
+class OldCsvDescriptorTests(PyFlinkTestCase):
+
+ def test_field_delimiter(self):
+ csv = OldCsv()
+
+ csv.field_delimiter("|")
+
+ properties = csv.to_properties()
+ expected = {'format.field-delimiter': '|',
+ 'format.type': 'csv',
+ 'format.property-version': '1'}
+ assert properties == expected
+
+ def test_line_delimiter(self):
+ csv = OldCsv()
+
+ csv.line_delimiter(";")
+
+ expected = {'format.type': 'csv',
+ 'format.property-version': '1',
+ 'format.line-delimiter': ';'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_ignore_parse_errors(self):
+ csv = OldCsv()
+
+ csv.ignore_parse_errors()
+
+ properties = csv.to_properties()
+ expected = {'format.ignore-parse-errors': 'true',
+ 'format.type': 'csv',
+ 'format.property-version': '1'}
+ assert properties == expected
+
+ def test_quote_character(self):
+ csv = OldCsv()
+
+ csv.quote_character("*")
+
+ properties = csv.to_properties()
+ expected = {'format.quote-character': '*',
+ 'format.type': 'csv',
+ 'format.property-version': '1'}
+ assert properties == expected
+
+ def test_comment_prefix(self):
+ csv = OldCsv()
+
+ csv.comment_prefix("#")
+
+ properties = csv.to_properties()
+ expected = {'format.comment-prefix': '#',
+ 'format.type': 'csv',
+ 'format.property-version': '1'}
+ assert properties == expected
+
+ def test_ignore_first_line(self):
+ csv = OldCsv()
+
+ csv.ignore_first_line()
+
+ properties = csv.to_properties()
+ expected = {'format.ignore-first-line': 'true',
+ 'format.type': 'csv',
+ 'format.property-version': '1'}
+ assert properties == expected
+
+ def test_field(self):
+ csv = OldCsv()
+
+ csv.field("a", DataTypes.LONG)
+ csv.field("b", DataTypes.STRING)
+ csv.field("c", "SQL_TIMESTAMP")
+
+ properties = csv.to_properties()
+ expected = {'format.fields.0.name': 'a',
+ 'format.fields.0.type': 'BIGINT',
+ 'format.fields.1.name': 'b',
+ 'format.fields.1.type': 'VARCHAR',
+ 'format.fields.2.name': 'c',
+ 'format.fields.2.type': 'SQL_TIMESTAMP',
+ 'format.type': 'csv',
+ 'format.property-version': '1'}
+ assert properties == expected
+
+
+class RowTimeDescriptorTests(PyFlinkTestCase):
+
+ def test_timestamps_from_field(self):
+ rowtime = Rowtime()
+
+ rowtime = rowtime.timestamps_from_field("rtime")
+
+ properties = rowtime.to_properties()
+ expect = {'rowtime.timestamps.type': 'from-field', 'rowtime.timestamps.from': 'rtime'}
+ assert properties == expect
+
+ def test_timestamps_from_source(self):
+ rowtime = Rowtime()
+
+ rowtime = rowtime.timestamps_from_source()
+
+ properties = rowtime.to_properties()
+ expect = {'rowtime.timestamps.type': 'from-source'}
+ assert properties == expect
+
+ def test_timestamps_from_extractor(self):
+ rowtime = Rowtime()
+
+ rowtime = rowtime.timestamps_from_extractor(
+ "org.apache.flink.table.descriptors.RowtimeTest$CustomExtractor")
+
+ properties = rowtime.to_properties()
+ expect = {'rowtime.timestamps.type': 'custom',
+ 'rowtime.timestamps.class':
+ 'org.apache.flink.table.descriptors.RowtimeTest$CustomExtractor',
+ 'rowtime.timestamps.serialized':
+ 'rO0ABXNyAD5vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3JzLlJvd3RpbWVUZXN0JEN1c3R'
+ 'vbUV4dHJhY3RvcoaChjMg55xwAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLm'
+ 'FwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1'
+ 'Y6piFNsGAIAAHhwdAACdHM'}
+ assert properties == expect
+
+ def test_watermarks_periodic_ascending(self):
+ rowtime = Rowtime()
+
+ rowtime = rowtime.watermarks_periodic_ascending()
+
+ properties = rowtime.to_properties()
+ expect = {'rowtime.watermarks.type': 'periodic-ascending'}
+ assert properties == expect
+
+ def test_watermarks_periodic_bounded(self):
+ rowtime = Rowtime()
+
+ rowtime = rowtime.watermarks_periodic_bounded(1000)
+
+ properties = rowtime.to_properties()
+ expect = {'rowtime.watermarks.type': 'periodic-bounded',
+ 'rowtime.watermarks.delay': '1000'}
+ assert properties == expect
+
+ def test_watermarks_from_source(self):
+ rowtime = Rowtime()
+
+ rowtime = rowtime.watermarks_from_source()
+
+ properties = rowtime.to_properties()
+ expect = {'rowtime.watermarks.type': 'from-source'}
+ assert properties == expect
+
+ def test_watermarks_from_strategy(self):
+ rowtime = Rowtime()
+
+ rowtime = rowtime.watermarks_from_strategy(
+ "org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner")
+
+ properties = rowtime.to_properties()
+ expect = {'rowtime.watermarks.type': 'custom',
+ 'rowtime.watermarks.class':
+ 'org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner',
+ 'rowtime.watermarks.serialized':
+ 'rO0ABXNyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3JzLlJvd3RpbWVUZXN0JEN1c3R'
+ 'vbUFzc2lnbmVyeDcuDvfbu0kCAAB4cgBHb3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLndtc3'
+ 'RyYXRlZ2llcy5QdW5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9AIAAHhyAD1vcmcuYXBhY'
+ '2hlLmZsaW5rLnRhYmxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cmF0ZWd53nt-g2OW'
+ 'aT4CAAB4cA'}
+ assert properties == expect
+
+
+class SchemaDescriptorTests(PyFlinkTestCase):
+
+ def test_field(self):
+ schema = Schema()
+
+ schema = schema\
+ .field("int_field", DataTypes.INT)\
+ .field("long_field", DataTypes.LONG)\
+ .field("string_field", DataTypes.STRING)\
+ .field("timestamp_field", DataTypes.TIMESTAMP)\
+ .field("time_field", DataTypes.TIME)\
+ .field("date_field", DataTypes.DATE)\
+ .field("double_field", DataTypes.DOUBLE)\
+ .field("float_field", DataTypes.FLOAT)\
+ .field("byte_field", DataTypes.BYTE)\
+ .field("short_field", DataTypes.SHORT)\
+ .field("boolean_field", DataTypes.BOOLEAN)
+
+ properties = schema.to_properties()
+ expected = {'schema.0.name': 'int_field',
+ 'schema.0.type': 'INT',
+ 'schema.1.name': 'long_field',
+ 'schema.1.type': 'BIGINT',
+ 'schema.2.name': 'string_field',
+ 'schema.2.type': 'VARCHAR',
+ 'schema.3.name': 'timestamp_field',
+ 'schema.3.type': 'TIMESTAMP',
+ 'schema.4.name': 'time_field',
+ 'schema.4.type': 'TIME',
+ 'schema.5.name': 'date_field',
+ 'schema.5.type': 'DATE',
+ 'schema.6.name': 'double_field',
+ 'schema.6.type': 'DOUBLE',
+ 'schema.7.name': 'float_field',
+ 'schema.7.type': 'FLOAT',
+ 'schema.8.name': 'byte_field',
+ 'schema.8.type': 'TINYINT',
+ 'schema.9.name': 'short_field',
+ 'schema.9.type': 'SMALLINT',
+ 'schema.10.name': 'boolean_field',
+ 'schema.10.type': 'BOOLEAN'}
+ assert properties == expected
+
+ def test_field_in_string(self):
+ schema = Schema()
+
+ schema = schema\
+ .field("int_field", 'INT')\
+ .field("long_field", 'BIGINT')\
+ .field("string_field", 'VARCHAR')\
+ .field("timestamp_field", 'SQL_TIMESTAMP')\
+ .field("time_field", 'SQL_TIME')\
+ .field("date_field", 'SQL_DATE')\
+ .field("double_field", 'DOUBLE')\
+ .field("float_field", 'FLOAT')\
+ .field("byte_field", 'TINYINT')\
+ .field("short_field", 'SMALLINT')\
+ .field("boolean_field", 'BOOLEAN')
+
+ properties = schema.to_properties()
+ expected = {'schema.0.name': 'int_field',
+ 'schema.0.type': 'INT',
+ 'schema.1.name': 'long_field',
+ 'schema.1.type': 'BIGINT',
+ 'schema.2.name': 'string_field',
+ 'schema.2.type': 'VARCHAR',
+ 'schema.3.name': 'timestamp_field',
+ 'schema.3.type': 'SQL_TIMESTAMP',
+ 'schema.4.name': 'time_field',
+ 'schema.4.type': 'SQL_TIME',
+ 'schema.5.name': 'date_field',
+ 'schema.5.type': 'SQL_DATE',
+ 'schema.6.name': 'double_field',
+ 'schema.6.type': 'DOUBLE',
+ 'schema.7.name': 'float_field',
+ 'schema.7.type': 'FLOAT',
+ 'schema.8.name': 'byte_field',
+ 'schema.8.type': 'TINYINT',
+ 'schema.9.name': 'short_field',
+ 'schema.9.type': 'SMALLINT',
+ 'schema.10.name': 'boolean_field',
+ 'schema.10.type': 'BOOLEAN'}
+ assert properties == expected
+
+ def test_from_origin_field(self):
+ schema = Schema()
+
+ schema = schema\
+ .field("int_field", DataTypes.INT)\
+ .field("long_field", DataTypes.LONG).from_origin_field("origin_field_a")\
+ .field("string_field", DataTypes.STRING)
+
+ properties = schema.to_properties()
+ expected = {'schema.0.name': 'int_field',
+ 'schema.0.type': 'INT',
+ 'schema.1.name': 'long_field',
+ 'schema.1.type': 'BIGINT',
+ 'schema.1.from': 'origin_field_a',
+ 'schema.2.name': 'string_field',
+ 'schema.2.type': 'VARCHAR'}
+ assert properties == expected
+
+ def test_proctime(self):
+ schema = Schema()
+
+ schema = schema\
+ .field("int_field", DataTypes.INT)\
+ .field("ptime", DataTypes.LONG).proctime()\
+ .field("string_field", DataTypes.STRING)
+
+ properties = schema.to_properties()
+ expected = {'schema.0.name': 'int_field',
+ 'schema.0.type': 'INT',
+ 'schema.1.name': 'ptime',
+ 'schema.1.type': 'BIGINT',
+ 'schema.1.proctime': 'true',
+ 'schema.2.name': 'string_field',
+ 'schema.2.type': 'VARCHAR'}
+ assert properties == expected
+
+ def test_rowtime(self):
+ schema = Schema()
+
+ schema = schema\
+ .field("int_field", DataTypes.INT)\
+ .field("long_field", DataTypes.LONG)\
+ .field("rtime", DataTypes.LONG)\
+ .rowtime(
+ Rowtime().timestamps_from_field("long_field").watermarks_periodic_bounded(5000))\
+ .field("string_field", DataTypes.STRING)
+
+ properties = schema.to_properties()
+ print(properties)
+ expected = {'schema.0.name': 'int_field',
+ 'schema.0.type': 'INT',
+ 'schema.1.name': 'long_field',
+ 'schema.1.type': 'BIGINT',
+ 'schema.2.name': 'rtime',
+ 'schema.2.type': 'BIGINT',
+ 'schema.2.rowtime.timestamps.type': 'from-field',
+ 'schema.2.rowtime.timestamps.from': 'long_field',
+ 'schema.2.rowtime.watermarks.type': 'periodic-bounded',
+ 'schema.2.rowtime.watermarks.delay': '5000',
+ 'schema.3.name': 'string_field',
+ 'schema.3.type': 'VARCHAR'}
+ assert properties == expected
+
+
+class AbstractTableDescriptorTests(object):
+
+ def test_with_format(self):
+ descriptor = self.t_env.connect(FileSystem())
+
+ descriptor.with_format(OldCsv().field("a", "INT"))
+
+ properties = descriptor.to_properties()
+
+ expected = {'format.type': 'csv',
+ 'format.property-version': '1',
+ 'format.fields.0.name': 'a',
+ 'format.fields.0.type': 'INT',
+ 'connector.property-version': '1',
+ 'connector.type': 'filesystem'}
+ assert properties == expected
+
+ def test_with_schema(self):
+ descriptor = self.t_env.connect(FileSystem())
+
+ descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT"))
+
+ properties = descriptor.to_properties()
+ expected = {'schema.0.name': 'a',
+ 'schema.0.type': 'INT',
+ 'format.type': 'csv',
+ 'format.property-version': '1',
+ 'connector.type': 'filesystem',
+ 'connector.property-version': '1'}
+ assert properties == expected
+
+ def test_register_table_sink(self):
+ source_path = os.path.join(self.tempdir + '/streaming.csv')
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+ data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+ csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+ t_env = self.t_env
+ t_env.register_table_source("source", csv_source)
+ # connect sink
+ sink_path = os.path.join(self.tempdir + '/streaming2.csv')
+ if os.path.isfile(sink_path):
+ os.remove(sink_path)
+
+ t_env.connect(FileSystem().path(sink_path))\
+ .with_format(OldCsv()
+ .field_delimiter(',')
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .with_schema(Schema()
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .register_table_sink("sink")
+ t_env.scan("source") \
+ .select("a + 1, b, c") \
+ .insert_into("sink")
+ t_env.execute()
+
+ with open(sink_path, 'r') as f:
+ lines = f.read()
+ assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n"
+
+ def test_register_table_source(self):
+ source_path = os.path.join(self.tempdir + '/streaming.csv')
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+ data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+ self.prepare_csv_source(source_path, data, field_types, field_names)
+ t_env = self.t_env
+ sink_path = os.path.join(self.tempdir + '/streaming2.csv')
+ if os.path.isfile(sink_path):
+ os.remove(sink_path)
+ t_env.register_table_sink(
+ "sink",
+ field_names, field_types, CsvTableSink(sink_path))
+
+ # connect source
+ t_env.connect(FileSystem().path(source_path))\
+ .with_format(OldCsv()
+ .field_delimiter(',')
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .with_schema(Schema()
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .register_table_source("source")
+ t_env.scan("source") \
+ .select("a + 1, b, c") \
+ .insert_into("sink")
+ t_env.execute()
+
+ with open(sink_path, 'r') as f:
+ lines = f.read()
+ assert lines == '2,Hi,Hello\n' + '3,Hello,Hello\n'
+
+ def test_register_table_source_and_sink(self):
+ source_path = os.path.join(self.tempdir + '/streaming.csv')
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+ data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+ self.prepare_csv_source(source_path, data, field_types, field_names)
+ sink_path = os.path.join(self.tempdir + '/streaming2.csv')
+ if os.path.isfile(sink_path):
+ os.remove(sink_path)
+ t_env = self.t_env
+
+ t_env.connect(FileSystem().path(source_path))\
+ .with_format(OldCsv()
+ .field_delimiter(',')
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .with_schema(Schema()
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .register_table_source_and_sink("source")
+ t_env.connect(FileSystem().path(sink_path))\
+ .with_format(OldCsv()
+ .field_delimiter(',')
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .with_schema(Schema()
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .register_table_source_and_sink("sink")
+ t_env.scan("source") \
+ .select("a + 1, b, c") \
+ .insert_into("sink")
+ t_env.execute()
+
+ with open(sink_path, 'r') as f:
+ lines = f.read()
+ assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n"
+
+
+class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescriptorTests):
+
+ def test_in_append_mode(self):
+ descriptor = self.t_env.connect(FileSystem())
+
+ descriptor\
+ .with_format(OldCsv())\
+ .in_append_mode()
+
+ properties = descriptor.to_properties()
+ expected = {'update-mode': 'append',
+ 'format.type': 'csv',
+ 'format.property-version': '1',
+ 'connector.property-version': '1',
+ 'connector.type': 'filesystem'}
+ assert properties == expected
+
+ def test_in_retract_mode(self):
+ descriptor = self.t_env.connect(FileSystem())
+
+ descriptor \
+ .with_format(OldCsv()) \
+ .in_retract_mode()
+
+ properties = descriptor.to_properties()
+ expected = {'update-mode': 'retract',
+ 'format.type': 'csv',
+ 'format.property-version': '1',
+ 'connector.property-version': '1',
+ 'connector.type': 'filesystem'}
+ assert properties == expected
+
+ def test_in_upsert_mode(self):
+ descriptor = self.t_env.connect(FileSystem())
+
+ descriptor \
+ .with_format(OldCsv()) \
+ .in_upsert_mode()
+
+ properties = descriptor.to_properties()
+ expected = {'update-mode': 'upsert',
+ 'format.type': 'csv',
+ 'format.property-version': '1',
+ 'connector.property-version': '1',
+ 'connector.type': 'filesystem'}
+ assert properties == expected
+
+
+class BatchTableDescriptorTests(PyFlinkBatchTableTestCase, AbstractTableDescriptorTests):
+ pass
+
+
+class StreamDescriptorEndToEndTests(PyFlinkStreamTableTestCase):
+
+ def test_end_to_end(self):
+ source_path = os.path.join(self.tempdir + '/streaming.csv')
+ with open(source_path, 'w') as f:
+ lines = 'a,b,c\n' + \
+ '1,hi,hello\n' + \
+ '#comments\n' + \
+ "error line\n" + \
+ '2,"hi,world!",hello\n'
+ f.write(lines)
+ f.close()
+ sink_path = os.path.join(self.tempdir + '/streaming2.csv')
+ t_env = self.t_env
+ # connect source
+ t_env.connect(FileSystem().path(source_path))\
+ .with_format(OldCsv()
+ .field_delimiter(',')
+ .line_delimiter("\n")
+ .ignore_parse_errors()
+ .quote_character('"')
+ .comment_prefix("#")
+ .ignore_first_line()
+ .field("a", "INT")
+ .field("b", "VARCHAR")
+ .field("c", "VARCHAR"))\
+ .with_schema(Schema()
+ .field("a", "INT")
+ .field("b", "VARCHAR")
+ .field("c", "VARCHAR"))\
+ .in_append_mode()\
+ .register_table_source("source")
+ # connect sink
+ t_env.connect(FileSystem().path(sink_path))\
+ .with_format(OldCsv()
+ .field_delimiter(',')
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .with_schema(Schema()
+ .field("a", DataTypes.INT)
+ .field("b", DataTypes.STRING)
+ .field("c", DataTypes.STRING))\
+ .register_table_sink("sink")
+
+ t_env.scan("source") \
+ .select("a + 1, b, c") \
+ .insert_into("sink")
+ t_env.execute()
+
+ with open(sink_path, 'r') as f:
+ lines = f.read()
+ assert lines == '2,hi,hello\n' + '3,hi,world!,hello\n'
+
+
+if __name__ == '__main__':
+ import unittest
+
+ try:
+ import xmlrunner
+ testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index d0bfaf8..c88b985 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,7 +41,7 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
# registerExternalCatalog, getRegisteredExternalCatalog and listTables
# should be supported when catalog supported in python.
# getCompletionHints has been deprecated. It will be removed in the next release.
- return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'connect',
+ return {'registerExternalCatalog', 'getRegisteredExternalCatalog',
'registerFunction', 'listUserDefinedFunctions', 'listTables', 'getCompletionHints'}