You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/22 10:21:38 UTC

[flink] branch master updated: [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e16fa9f  [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API
e16fa9f is described below

commit e16fa9fe1506ec725f5c5abb2b24afa246e17dae
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Tue May 21 16:25:16 2019 +0800

    [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API
    
    Brief change log:
     - Add all of the existing descriptor interfaces align Java Table API.
     - Add FileSystem connector and OldCsv format support.
     - The `schema(..)` of OldCsv will be added in FLINK-12588.
    
    This closes #8488
---
 flink-python/pyflink/java_gateway.py               |   1 +
 flink-python/pyflink/table/__init__.py             |   5 +
 flink-python/pyflink/table/table_descriptor.py     | 510 +++++++++++++++++
 flink-python/pyflink/table/table_environment.py    |  91 +++
 .../pyflink/table/tests/test_descriptor.py         | 618 +++++++++++++++++++++
 .../table/tests/test_environment_completeness.py   |   2 +-
 6 files changed, 1226 insertions(+), 1 deletion(-)

diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index ed1dc89..e5c8330 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -101,6 +101,7 @@ def launch_gateway():
     java_import(gateway.jvm, "org.apache.flink.table.api.*")
     java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
     java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
+    java_import(gateway.jvm, "org.apache.flink.table.descriptors.*")
     java_import(gateway.jvm, "org.apache.flink.table.sources.*")
     java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
     java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.TypeInformation")
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index dcbc0ab..4ea3b3f 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -40,6 +40,7 @@ from pyflink.table.table_sink import TableSink, CsvTableSink
 from pyflink.table.table_source import TableSource, CsvTableSource
 from pyflink.table.types import DataTypes
 from pyflink.table.window import Tumble, Session, Slide, Over
+from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem
 
 __all__ = [
     'TableEnvironment',
@@ -56,4 +57,8 @@ __all__ = [
     'Session',
     'Slide',
     'Over',
+    'Rowtime',
+    'Schema',
+    'OldCsv',
+    'FileSystem',
 ]
diff --git a/flink-python/pyflink/table/table_descriptor.py b/flink-python/pyflink/table/table_descriptor.py
new file mode 100644
index 0000000..ed9a157
--- /dev/null
+++ b/flink-python/pyflink/table/table_descriptor.py
@@ -0,0 +1,510 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from abc import ABCMeta
+
+from py4j.java_gateway import get_method
+
+from pyflink.java_gateway import get_gateway
+from pyflink.util.type_utils import to_java_type
+
+if sys.version >= '3':
+    unicode = str
+
+__all__ = [
+    'Rowtime',
+    'Schema',
+    'OldCsv',
+    'FileSystem'
+]
+
+
+class Descriptor(object):
+    """
+    Base class of the descriptors that adds a set of string-based, normalized properties for
+    describing DDL information.
+
+    Typical characteristics of a descriptor are:
+    - descriptors have a default constructor
+    - descriptors themselves contain very little logic
+    - corresponding validators validate the correctness (goal: have a single point of validation)
+
+    A descriptor is similar to a builder in a builder pattern, thus, mutable for building
+    properties.
+    """
+
+    __metaclass__ = ABCMeta
+
+    def __init__(self, j_descriptor):
+        self._j_descriptor = j_descriptor
+
+    def to_properties(self):
+        """
+        Converts this descriptor into a dict of properties.
+
+        :return: Dict object contains all of current properties.
+        """
+        return dict(self._j_descriptor.toProperties())
+
+
+class Rowtime(Descriptor):
+    """
+    Rowtime descriptor for describing an event time attribute in the schema.
+    """
+
+    def __init__(self):
+        gateway = get_gateway()
+        self._j_rowtime = gateway.jvm.Rowtime()
+        super(Rowtime, self).__init__(self._j_rowtime)
+
+    def timestamps_from_field(self, field_name):
+        """
+        Sets a built-in timestamp extractor that converts an existing LONG or TIMESTAMP field into
+        the rowtime attribute.
+
+        :param field_name: The field to convert into a rowtime attribute.
+        :return: This rowtime descriptor.
+        """
+        self._j_rowtime = self._j_rowtime.timestampsFromField(field_name)
+        return self
+
+    def timestamps_from_source(self):
+        """
+        Sets a built-in timestamp extractor that converts the assigned timestamps from a DataStream
+        API record into the rowtime attribute and thus preserves the assigned timestamps from the
+        source.
+
+        ..note::
+            This extractor only works in streaming environments.
+
+        :return: This rowtime descriptor.
+        """
+        self._j_rowtime = self._j_rowtime.timestampsFromSource()
+        return self
+
+    def timestamps_from_extractor(self, extractor):
+        """
+        Sets a custom timestamp extractor to be used for the rowtime attribute.
+
+        :param extractor: The java canonical class name of the TimestampExtractor to extract the
+                          rowtime attribute from the physical type. The TimestampExtractor must
+                          have a public no-argument constructor and can be founded by
+                          in current Java classloader.
+        :return: This rowtime descriptor.
+        """
+        gateway = get_gateway()
+        self._j_rowtime = self._j_rowtime.timestampsFromExtractor(
+            gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(extractor)
+                   .newInstance())
+        return self
+
+    def watermarks_periodic_ascending(self):
+        """
+        Sets a built-in watermark strategy for ascending rowtime attributes.
+
+        Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a
+        timestamp equal to the max timestamp are not late.
+
+        :return: This rowtime descriptor.
+        """
+        self._j_rowtime = self._j_rowtime.watermarksPeriodicAscending()
+        return self
+
+    def watermarks_periodic_bounded(self, delay):
+        """
+        Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a
+        bounded time interval.
+
+        Emits watermarks which are the maximum observed timestamp minus the specified delay.
+
+        :param delay: Delay in milliseconds.
+        :return: This rowtime descriptor.
+        """
+        self._j_rowtime = self._j_rowtime.watermarksPeriodicBounded(delay)
+        return self
+
+    def watermarks_from_source(self):
+        """
+        Sets a built-in watermark strategy which indicates the watermarks should be preserved from
+        the underlying DataStream API and thus preserves the assigned watermarks from the source.
+
+        :return: This rowtime descriptor.
+        """
+        self._j_rowtime = self._j_rowtime.watermarksFromSource()
+        return self
+
+    def watermarks_from_strategy(self, strategy):
+        """
+        Sets a custom watermark strategy to be used for the rowtime attribute.
+
+        :param strategy: The java canonical class name of the WatermarkStrategy. The
+                         WatermarkStrategy must have a public no-argument constructor and can be
+                         founded by in current Java classloader.
+        :return: This rowtime descriptor.
+        """
+        gateway = get_gateway()
+        self._j_rowtime = self._j_rowtime.watermarksFromStrategy(
+            gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(strategy)
+                   .newInstance())
+        return self
+
+
+class Schema(Descriptor):
+    """
+    Describes a schema of a table.
+
+    ..note::
+        Field names are matched by the exact name by default (case sensitive).
+    """
+
+    def __init__(self):
+        gateway = get_gateway()
+        self._j_schema = gateway.jvm.Schema()
+        super(Schema, self).__init__(self._j_schema)
+
+    def field(self, field_name, field_type):
+        """
+        Adds a field with the field name and the data type or type string. Required.
+        This method can be called multiple times. The call order of this method defines
+        also the order of the fields in a row. Here is a document that introduces the type strings:
+        https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#type-strings
+
+        :param field_name: The field name.
+        :param field_type: The data type or type string of the field.
+        :return: This schema object.
+        """
+        if isinstance(field_type, (str, unicode)):
+            self._j_schema = self._j_schema.field(field_name, field_type)
+        else:
+            self._j_schema = self._j_schema.field(field_name, to_java_type(field_type))
+        return self
+
+    def from_origin_field(self, origin_field_name):
+        """
+        Specifies the origin of the previously defined field. The origin field is defined by a
+        connector or format.
+
+        E.g. field("myString", Types.STRING).from_origin_field("CSV_MY_STRING")
+
+        ..note::
+            Field names are matched by the exact name by default (case sensitive).
+
+        :param origin_field_name: The origin field name.
+        :return: This schema object.
+        """
+        self._j_schema = get_method(self._j_schema, "from")(origin_field_name)
+        return self
+
+    def proctime(self):
+        """
+        Specifies the previously defined field as a processing-time attribute.
+
+        E.g. field("proctime", Types.SQL_TIMESTAMP).proctime()
+
+        :return: This schema object.
+        """
+        self._j_schema = self._j_schema.proctime()
+        return self
+
+    def rowtime(self, rowtime):
+        """
+        Specifies the previously defined field as an event-time attribute.
+
+        E.g. field("rowtime", Types.SQL_TIMESTAMP).rowtime(...)
+
+        :param rowtime: A :class:`RowTime`.
+        :return: This schema object.
+        """
+        self._j_schema = self._j_schema.rowtime(rowtime._j_rowtime)
+        return self
+
+
+class FormatDescriptor(Descriptor):
+    """
+    Describes the format of data.
+    """
+
+    __metaclass__ = ABCMeta
+
+    def __init__(self, j_format_descriptor):
+        self._j_format_descriptor = j_format_descriptor
+        super(FormatDescriptor, self).__init__(self._j_format_descriptor)
+
+
+class OldCsv(FormatDescriptor):
+    """
+    Format descriptor for comma-separated values (CSV).
+
+    ..note::
+        This descriptor describes Flink's non-standard CSV table source/sink. In the future, the
+        descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant `Csv`
+        format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use
+        the old one for stream/batch filesystem operations for now.
+
+    .. note::
+        Deprecated: use the RFC-compliant `Csv` format instead when writing to Kafka.
+    """
+
+    def __init__(self):
+        gateway = get_gateway()
+        self._j_csv = gateway.jvm.OldCsv()
+        super(OldCsv, self).__init__(self._j_csv)
+
+    def field_delimiter(self, delimiter):
+        """
+        Sets the field delimiter, "," by default.
+
+        :param delimiter: The field delimiter.
+        :return: This :class:`OldCsv` object.
+        """
+        self._j_csv = self._j_csv.fieldDelimiter(delimiter)
+        return self
+
+    def line_delimiter(self, delimiter):
+        """
+        Sets the line delimiter, "\n" by default.
+
+        :param delimiter: The line delimiter.
+        :return: This :class:`OldCsv` object.
+        """
+        self._j_csv = self._j_csv.lineDelimiter(delimiter)
+        return self
+
+    def field(self, field_name, field_type):
+        """
+        Adds a format field with the field name and the data type or type string. Required.
+        This method can be called multiple times. The call order of this method defines
+        also the order of the fields in the format.
+
+        :param field_name: The field name.
+        :param field_type: The data type or type string of the field.
+        :return: This :class:`OldCsv` object.
+        """
+        if isinstance(field_type, (str, unicode)):
+            self._j_csv = self._j_csv.field(field_name, field_type)
+        else:
+            self._j_csv = self._j_csv.field(field_name, to_java_type(field_type))
+        return self
+
+    def quote_character(self, quote_character):
+        """
+        Sets a quote character for String values, null by default.
+
+        :param quote_character: The quote character.
+        :return: This :class:`OldCsv` object.
+        """
+        self._j_csv = self._j_csv.quoteCharacter(quote_character)
+        return self
+
+    def comment_prefix(self, prefix):
+        """
+        Sets a prefix to indicate comments, null by default.
+
+        :param prefix: The prefix to indicate comments.
+        :return: This :class:`OldCsv` object.
+        """
+        self._j_csv = self._j_csv.commentPrefix(prefix)
+        return self
+
+    def ignore_parse_errors(self):
+        """
+        Skip records with parse error instead to fail. Throw an exception by default.
+
+        :return: This :class:`OldCsv` object.
+        """
+        self._j_csv = self._j_csv.ignoreParseErrors()
+        return self
+
+    def ignore_first_line(self):
+        """
+        Ignore the first line. Not skip the first line by default.
+
+        :return: This :class:`OldCsv` object.
+        """
+        self._j_csv = self._j_csv.ignoreFirstLine()
+        return self
+
+
+class ConnectorDescriptor(Descriptor):
+    """
+    Describes a connector to an other system.
+    """
+
+    __metaclass__ = ABCMeta
+
+    def __init__(self, j_connector_descriptor):
+        self._j_connector_descriptor = j_connector_descriptor
+        super(ConnectorDescriptor, self).__init__(self._j_connector_descriptor)
+
+
+class FileSystem(ConnectorDescriptor):
+    """
+    Connector descriptor for a file system.
+    """
+
+    def __init__(self):
+        gateway = get_gateway()
+        self._j_file_system = gateway.jvm.FileSystem()
+        super(FileSystem, self).__init__(self._j_file_system)
+
+    def path(self, path_str):
+        """
+        Sets the path to a file or directory in a file system.
+
+        :param path_str: The path of a file or directory.
+        :return: This :class:`FileSystem` object.
+        """
+        self._j_file_system = self._j_file_system.path(path_str)
+        return self
+
+
+class ConnectTableDescriptor(Descriptor):
+    """
+    Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`.
+    """
+
+    __metaclass__ = ABCMeta
+
+    def __init__(self, j_table_descriptor):
+        self._j_table_descriptor = j_table_descriptor
+        super(ConnectTableDescriptor, self).__init__(self._j_table_descriptor)
+
+    def with_format(self, format_descriptor):
+        """
+        Specifies the format that defines how to read data from a connector.
+
+        :type format_descriptor: The :class:`FormatDescriptor` for the resulting table,
+                                 e.g. :class:`OldCsv`.
+        :return: This object.
+        """
+        self._j_table_descriptor = \
+            self._j_table_descriptor.withFormat(format_descriptor._j_format_descriptor)
+        return self
+
+    def with_schema(self, schema):
+        """
+        Specifies the resulting table schema.
+
+        :type schema: The :class:`Schema` object for the resulting table.
+        :return: This object.
+        """
+        self._j_table_descriptor = self._j_table_descriptor.withSchema(schema._j_schema)
+        return self
+
+    def register_table_sink(self, name):
+        """
+        Searches for the specified table sink, configures it accordingly, and registers it as
+        a table under the given name.
+
+        :param name: Table name to be registered in the table environment.
+        :return: This object.
+        """
+        self._j_table_descriptor = self._j_table_descriptor.registerTableSink(name)
+        return self
+
+    def register_table_source(self, name):
+        """
+        Searches for the specified table source, configures it accordingly, and registers it as
+        a table under the given name.
+
+        :param name: Table name to be registered in the table environment.
+        :return: This object.
+        """
+        self._j_table_descriptor = self._j_table_descriptor.registerTableSource(name)
+        return self
+
+    def register_table_source_and_sink(self, name):
+        """
+        Searches for the specified table source and sink, configures them accordingly, and
+        registers them as a table under the given name.
+
+        :param name: Table name to be registered in the table environment.
+        :return: This object.
+        """
+        self._j_table_descriptor = self._j_table_descriptor.registerTableSourceAndSink(name)
+        return self
+
+
+class StreamTableDescriptor(ConnectTableDescriptor):
+    """
+    Descriptor for specifying a table source and/or sink in a streaming environment.
+    """
+
+    def __init__(self, j_stream_table_descriptor):
+        self._j_stream_table_descriptor = j_stream_table_descriptor
+        super(StreamTableDescriptor, self).__init__(self._j_stream_table_descriptor)
+
+    def in_append_mode(self):
+        """
+        Declares how to perform the conversion between a dynamic table and an external connector.
+
+        In append mode, a dynamic table and an external connector only exchange INSERT messages.
+
+        :return: This object.
+        """
+        self._j_stream_table_descriptor = self._j_stream_table_descriptor.inAppendMode()
+        return self
+
+    def in_retract_mode(self):
+        """
+        Declares how to perform the conversion between a dynamic table and an external connector.
+
+        In retract mode, a dynamic table and an external connector exchange ADD and RETRACT
+        messages.
+
+        An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
+        UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
+        the updating (new) row.
+
+        In this mode, a key must not be defined as opposed to upsert mode. However, every update
+        consists of two messages which is less efficient.
+
+        :return: This object.
+        """
+        self._j_stream_table_descriptor = self._j_stream_table_descriptor.inRetractMode()
+        return self
+
+    def in_upsert_mode(self):
+        """
+        Declares how to perform the conversion between a dynamic table and an external connector.
+
+        In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE
+        messages.
+
+        This mode requires a (possibly composite) unique key by which updates can be propagated. The
+        external connector needs to be aware of the unique key attribute in order to apply messages
+        correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
+        DELETE messages.
+
+        The main difference to a retract stream is that UPDATE changes are encoded with a single
+        message and are therefore more efficient.
+
+        :return: This object.
+        """
+        self._j_stream_table_descriptor = self._j_stream_table_descriptor.inUpsertMode()
+        return self
+
+
+class BatchTableDescriptor(ConnectTableDescriptor):
+    """
+    Descriptor for specifying a table source and/or sink in a batch environment.
+    """
+
+    def __init__(self, j_batch_table_descriptor):
+        self.j_batch_table_descriptor = j_batch_table_descriptor
+        super(BatchTableDescriptor, self).__init__(self.j_batch_table_descriptor)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 5b9886d..dc77b04 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -20,6 +20,8 @@ from abc import ABCMeta, abstractmethod
 
 from pyflink.table.query_config import StreamQueryConfig, BatchQueryConfig, QueryConfig
 from pyflink.table.table_config import TableConfig
+from pyflink.table.table_descriptor import (StreamTableDescriptor, ConnectorDescriptor,
+                                            BatchTableDescriptor)
 
 from pyflink.java_gateway import get_gateway
 from pyflink.table import Table
@@ -207,6 +209,35 @@ class TableEnvironment(object):
     def query_config(self):
         pass
 
+    @abstractmethod
+    def connect(self, connector_descriptor):
+        """
+        Creates a table source and/or table sink from a descriptor.
+
+        Descriptors allow for declaring the communication to external systems in an
+        implementation-agnostic way. The classpath is scanned for suitable table factories that
+        match the desired configuration.
+
+        The following example shows how to read from a connector using a JSON format and
+        registering a table source as "MyTable":
+        ::
+            >>> table_env\
+            ...     .connect(ExternalSystemXYZ()
+            ...              .version("0.11"))\
+            ...     .with_format(Json()
+            ...                  .json_schema("{...}")
+            ...                 .fail_on_missing_field(False))\
+            ...     .with_schema(Schema()
+            ...                 .field("user-name", "VARCHAR")
+            ...                 .from_origin_field("u_name")
+            ...                 .field("count", "DECIMAL"))\
+            ...     .register_table_source("MyTable")
+
+        :param connector_descriptor: Connector descriptor describing the external system.
+        :return: A :class:`ConnectTableDescriptor` used to build the table source/sink.
+        """
+        pass
+
     @classmethod
     def create(cls, table_config):
         """
@@ -261,6 +292,36 @@ class StreamTableEnvironment(TableEnvironment):
         """
         return StreamQueryConfig(self._j_tenv.queryConfig())
 
+    def connect(self, connector_descriptor):
+        """
+        Creates a table source and/or table sink from a descriptor.
+
+        Descriptors allow for declaring the communication to external systems in an
+        implementation-agnostic way. The classpath is scanned for suitable table factories that
+        match the desired configuration.
+
+        The following example shows how to read from a connector using a JSON format and
+        registering a table source as "MyTable":
+        ::
+            >>> table_env\
+            ...     .connect(ExternalSystemXYZ()
+            ...              .version("0.11"))\
+            ...     .with_format(Json()
+            ...                  .json_schema("{...}")
+            ...                 .fail_on_missing_field(False))\
+            ...     .with_schema(Schema()
+            ...                 .field("user-name", "VARCHAR")
+            ...                 .from_origin_field("u_name")
+            ...                 .field("count", "DECIMAL"))\
+            ...     .register_table_source("MyTable")
+
+        :param connector_descriptor: Connector descriptor describing the external system.
+        :return: A :class:`StreamTableDescriptor` used to build the table source/sink.
+        """
+        # type: (ConnectorDescriptor) -> StreamTableDescriptor
+        return StreamTableDescriptor(
+            self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
+
 
 class BatchTableEnvironment(TableEnvironment):
 
@@ -288,3 +349,33 @@ class BatchTableEnvironment(TableEnvironment):
         :return: A new :class:`BatchQueryConfig`.
         """
         return BatchQueryConfig(self._j_tenv.queryConfig())
+
+    def connect(self, connector_descriptor):
+        """
+        Creates a table source and/or table sink from a descriptor.
+
+        Descriptors allow for declaring the communication to external systems in an
+        implementation-agnostic way. The classpath is scanned for suitable table factories that
+        match the desired configuration.
+
+        The following example shows how to read from a connector using a JSON format and
+        registering a table source as "MyTable":
+        ::
+            >>> table_env\
+            ...     .connect(ExternalSystemXYZ()
+            ...              .version("0.11"))\
+            ...     .with_format(Json()
+            ...                  .json_schema("{...}")
+            ...                 .fail_on_missing_field(False))\
+            ...     .with_schema(Schema()
+            ...                 .field("user-name", "VARCHAR")
+            ...                 .from_origin_field("u_name")
+            ...                 .field("count", "DECIMAL"))\
+            ...     .register_table_source("MyTable")
+
+        :param connector_descriptor: Connector descriptor describing the external system.
+        :return: A :class:`BatchTableDescriptor` used to build the table source/sink.
+        """
+        # type: (ConnectorDescriptor) -> BatchTableDescriptor
+        return BatchTableDescriptor(
+            self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
new file mode 100644
index 0000000..f2c2976
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -0,0 +1,618 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema)
+from pyflink.table.table_sink import CsvTableSink
+from pyflink.table.types import DataTypes
+from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
+                                             PyFlinkBatchTableTestCase)
+
+
+class FileSystemDescriptorTests(PyFlinkTestCase):
+
+    def test_path(self):
+        file_system = FileSystem()
+
+        file_system.path("/test.csv")
+
+        properties = file_system.to_properties()
+        expected = {'connector.property-version': '1',
+                    'connector.type': 'filesystem',
+                    'connector.path': '/test.csv'}
+        assert properties == expected
+
+
+class OldCsvDescriptorTests(PyFlinkTestCase):
+
+    def test_field_delimiter(self):
+        csv = OldCsv()
+
+        csv.field_delimiter("|")
+
+        properties = csv.to_properties()
+        expected = {'format.field-delimiter': '|',
+                    'format.type': 'csv',
+                    'format.property-version': '1'}
+        assert properties == expected
+
+    def test_line_delimiter(self):
+        csv = OldCsv()
+
+        csv.line_delimiter(";")
+
+        expected = {'format.type': 'csv',
+                    'format.property-version': '1',
+                    'format.line-delimiter': ';'}
+
+        properties = csv.to_properties()
+        assert properties == expected
+
+    def test_ignore_parse_errors(self):
+        csv = OldCsv()
+
+        csv.ignore_parse_errors()
+
+        properties = csv.to_properties()
+        expected = {'format.ignore-parse-errors': 'true',
+                    'format.type': 'csv',
+                    'format.property-version': '1'}
+        assert properties == expected
+
+    def test_quote_character(self):
+        csv = OldCsv()
+
+        csv.quote_character("*")
+
+        properties = csv.to_properties()
+        expected = {'format.quote-character': '*',
+                    'format.type': 'csv',
+                    'format.property-version': '1'}
+        assert properties == expected
+
+    def test_comment_prefix(self):
+        csv = OldCsv()
+
+        csv.comment_prefix("#")
+
+        properties = csv.to_properties()
+        expected = {'format.comment-prefix': '#',
+                    'format.type': 'csv',
+                    'format.property-version': '1'}
+        assert properties == expected
+
+    def test_ignore_first_line(self):
+        csv = OldCsv()
+
+        csv.ignore_first_line()
+
+        properties = csv.to_properties()
+        expected = {'format.ignore-first-line': 'true',
+                    'format.type': 'csv',
+                    'format.property-version': '1'}
+        assert properties == expected
+
+    def test_field(self):
+        csv = OldCsv()
+
+        csv.field("a", DataTypes.LONG)
+        csv.field("b", DataTypes.STRING)
+        csv.field("c", "SQL_TIMESTAMP")
+
+        properties = csv.to_properties()
+        expected = {'format.fields.0.name': 'a',
+                    'format.fields.0.type': 'BIGINT',
+                    'format.fields.1.name': 'b',
+                    'format.fields.1.type': 'VARCHAR',
+                    'format.fields.2.name': 'c',
+                    'format.fields.2.type': 'SQL_TIMESTAMP',
+                    'format.type': 'csv',
+                    'format.property-version': '1'}
+        assert properties == expected
+
+
+class RowTimeDescriptorTests(PyFlinkTestCase):
+
+    def test_timestamps_from_field(self):
+        rowtime = Rowtime()
+
+        rowtime = rowtime.timestamps_from_field("rtime")
+
+        properties = rowtime.to_properties()
+        expect = {'rowtime.timestamps.type': 'from-field', 'rowtime.timestamps.from': 'rtime'}
+        assert properties == expect
+
+    def test_timestamps_from_source(self):
+        rowtime = Rowtime()
+
+        rowtime = rowtime.timestamps_from_source()
+
+        properties = rowtime.to_properties()
+        expect = {'rowtime.timestamps.type': 'from-source'}
+        assert properties == expect
+
+    def test_timestamps_from_extractor(self):
+        rowtime = Rowtime()
+
+        rowtime = rowtime.timestamps_from_extractor(
+            "org.apache.flink.table.descriptors.RowtimeTest$CustomExtractor")
+
+        properties = rowtime.to_properties()
+        expect = {'rowtime.timestamps.type': 'custom',
+                  'rowtime.timestamps.class':
+                  'org.apache.flink.table.descriptors.RowtimeTest$CustomExtractor',
+                  'rowtime.timestamps.serialized':
+                  'rO0ABXNyAD5vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3JzLlJvd3RpbWVUZXN0JEN1c3R'
+                  'vbUV4dHJhY3RvcoaChjMg55xwAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLm'
+                  'FwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1'
+                  'Y6piFNsGAIAAHhwdAACdHM'}
+        assert properties == expect
+
+    def test_watermarks_periodic_ascending(self):
+        rowtime = Rowtime()
+
+        rowtime = rowtime.watermarks_periodic_ascending()
+
+        properties = rowtime.to_properties()
+        expect = {'rowtime.watermarks.type': 'periodic-ascending'}
+        assert properties == expect
+
+    def test_watermarks_periodic_bounded(self):
+        rowtime = Rowtime()
+
+        rowtime = rowtime.watermarks_periodic_bounded(1000)
+
+        properties = rowtime.to_properties()
+        expect = {'rowtime.watermarks.type': 'periodic-bounded',
+                  'rowtime.watermarks.delay': '1000'}
+        assert properties == expect
+
+    def test_watermarks_from_source(self):
+        rowtime = Rowtime()
+
+        rowtime = rowtime.watermarks_from_source()
+
+        properties = rowtime.to_properties()
+        expect = {'rowtime.watermarks.type': 'from-source'}
+        assert properties == expect
+
+    def test_watermarks_from_strategy(self):
+        rowtime = Rowtime()
+
+        rowtime = rowtime.watermarks_from_strategy(
+            "org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner")
+
+        properties = rowtime.to_properties()
+        expect = {'rowtime.watermarks.type': 'custom',
+                  'rowtime.watermarks.class':
+                  'org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner',
+                  'rowtime.watermarks.serialized':
+                  'rO0ABXNyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3JzLlJvd3RpbWVUZXN0JEN1c3R'
+                  'vbUFzc2lnbmVyeDcuDvfbu0kCAAB4cgBHb3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLndtc3'
+                  'RyYXRlZ2llcy5QdW5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9AIAAHhyAD1vcmcuYXBhY'
+                  '2hlLmZsaW5rLnRhYmxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cmF0ZWd53nt-g2OW'
+                  'aT4CAAB4cA'}
+        assert properties == expect
+
+
+class SchemaDescriptorTests(PyFlinkTestCase):
+
+    def test_field(self):
+        schema = Schema()
+
+        schema = schema\
+            .field("int_field", DataTypes.INT)\
+            .field("long_field", DataTypes.LONG)\
+            .field("string_field", DataTypes.STRING)\
+            .field("timestamp_field", DataTypes.TIMESTAMP)\
+            .field("time_field", DataTypes.TIME)\
+            .field("date_field", DataTypes.DATE)\
+            .field("double_field", DataTypes.DOUBLE)\
+            .field("float_field", DataTypes.FLOAT)\
+            .field("byte_field", DataTypes.BYTE)\
+            .field("short_field", DataTypes.SHORT)\
+            .field("boolean_field", DataTypes.BOOLEAN)
+
+        properties = schema.to_properties()
+        expected = {'schema.0.name': 'int_field',
+                    'schema.0.type': 'INT',
+                    'schema.1.name': 'long_field',
+                    'schema.1.type': 'BIGINT',
+                    'schema.2.name': 'string_field',
+                    'schema.2.type': 'VARCHAR',
+                    'schema.3.name': 'timestamp_field',
+                    'schema.3.type': 'TIMESTAMP',
+                    'schema.4.name': 'time_field',
+                    'schema.4.type': 'TIME',
+                    'schema.5.name': 'date_field',
+                    'schema.5.type': 'DATE',
+                    'schema.6.name': 'double_field',
+                    'schema.6.type': 'DOUBLE',
+                    'schema.7.name': 'float_field',
+                    'schema.7.type': 'FLOAT',
+                    'schema.8.name': 'byte_field',
+                    'schema.8.type': 'TINYINT',
+                    'schema.9.name': 'short_field',
+                    'schema.9.type': 'SMALLINT',
+                    'schema.10.name': 'boolean_field',
+                    'schema.10.type': 'BOOLEAN'}
+        assert properties == expected
+
+    def test_field_in_string(self):
+        schema = Schema()
+
+        schema = schema\
+            .field("int_field", 'INT')\
+            .field("long_field", 'BIGINT')\
+            .field("string_field", 'VARCHAR')\
+            .field("timestamp_field", 'SQL_TIMESTAMP')\
+            .field("time_field", 'SQL_TIME')\
+            .field("date_field", 'SQL_DATE')\
+            .field("double_field", 'DOUBLE')\
+            .field("float_field", 'FLOAT')\
+            .field("byte_field", 'TINYINT')\
+            .field("short_field", 'SMALLINT')\
+            .field("boolean_field", 'BOOLEAN')
+
+        properties = schema.to_properties()
+        expected = {'schema.0.name': 'int_field',
+                    'schema.0.type': 'INT',
+                    'schema.1.name': 'long_field',
+                    'schema.1.type': 'BIGINT',
+                    'schema.2.name': 'string_field',
+                    'schema.2.type': 'VARCHAR',
+                    'schema.3.name': 'timestamp_field',
+                    'schema.3.type': 'SQL_TIMESTAMP',
+                    'schema.4.name': 'time_field',
+                    'schema.4.type': 'SQL_TIME',
+                    'schema.5.name': 'date_field',
+                    'schema.5.type': 'SQL_DATE',
+                    'schema.6.name': 'double_field',
+                    'schema.6.type': 'DOUBLE',
+                    'schema.7.name': 'float_field',
+                    'schema.7.type': 'FLOAT',
+                    'schema.8.name': 'byte_field',
+                    'schema.8.type': 'TINYINT',
+                    'schema.9.name': 'short_field',
+                    'schema.9.type': 'SMALLINT',
+                    'schema.10.name': 'boolean_field',
+                    'schema.10.type': 'BOOLEAN'}
+        assert properties == expected
+
+    def test_from_origin_field(self):
+        schema = Schema()
+
+        schema = schema\
+            .field("int_field", DataTypes.INT)\
+            .field("long_field", DataTypes.LONG).from_origin_field("origin_field_a")\
+            .field("string_field", DataTypes.STRING)
+
+        properties = schema.to_properties()
+        expected = {'schema.0.name': 'int_field',
+                    'schema.0.type': 'INT',
+                    'schema.1.name': 'long_field',
+                    'schema.1.type': 'BIGINT',
+                    'schema.1.from': 'origin_field_a',
+                    'schema.2.name': 'string_field',
+                    'schema.2.type': 'VARCHAR'}
+        assert properties == expected
+
+    def test_proctime(self):
+        schema = Schema()
+
+        schema = schema\
+            .field("int_field", DataTypes.INT)\
+            .field("ptime", DataTypes.LONG).proctime()\
+            .field("string_field", DataTypes.STRING)
+
+        properties = schema.to_properties()
+        expected = {'schema.0.name': 'int_field',
+                    'schema.0.type': 'INT',
+                    'schema.1.name': 'ptime',
+                    'schema.1.type': 'BIGINT',
+                    'schema.1.proctime': 'true',
+                    'schema.2.name': 'string_field',
+                    'schema.2.type': 'VARCHAR'}
+        assert properties == expected
+
+    def test_rowtime(self):
+        schema = Schema()
+
+        schema = schema\
+            .field("int_field", DataTypes.INT)\
+            .field("long_field", DataTypes.LONG)\
+            .field("rtime", DataTypes.LONG)\
+            .rowtime(
+                Rowtime().timestamps_from_field("long_field").watermarks_periodic_bounded(5000))\
+            .field("string_field", DataTypes.STRING)
+
+        properties = schema.to_properties()
+        print(properties)
+        expected = {'schema.0.name': 'int_field',
+                    'schema.0.type': 'INT',
+                    'schema.1.name': 'long_field',
+                    'schema.1.type': 'BIGINT',
+                    'schema.2.name': 'rtime',
+                    'schema.2.type': 'BIGINT',
+                    'schema.2.rowtime.timestamps.type': 'from-field',
+                    'schema.2.rowtime.timestamps.from': 'long_field',
+                    'schema.2.rowtime.watermarks.type': 'periodic-bounded',
+                    'schema.2.rowtime.watermarks.delay': '5000',
+                    'schema.3.name': 'string_field',
+                    'schema.3.type': 'VARCHAR'}
+        assert properties == expected
+
+
+class AbstractTableDescriptorTests(object):
+
+    def test_with_format(self):
+        descriptor = self.t_env.connect(FileSystem())
+
+        descriptor.with_format(OldCsv().field("a", "INT"))
+
+        properties = descriptor.to_properties()
+
+        expected = {'format.type': 'csv',
+                    'format.property-version': '1',
+                    'format.fields.0.name': 'a',
+                    'format.fields.0.type': 'INT',
+                    'connector.property-version': '1',
+                    'connector.type': 'filesystem'}
+        assert properties == expected
+
+    def test_with_schema(self):
+        descriptor = self.t_env.connect(FileSystem())
+
+        descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT"))
+
+        properties = descriptor.to_properties()
+        expected = {'schema.0.name': 'a',
+                    'schema.0.type': 'INT',
+                    'format.type': 'csv',
+                    'format.property-version': '1',
+                    'connector.type': 'filesystem',
+                    'connector.property-version': '1'}
+        assert properties == expected
+
+    def test_register_table_sink(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("source", csv_source)
+        # connect sink
+        sink_path = os.path.join(self.tempdir + '/streaming2.csv')
+        if os.path.isfile(sink_path):
+            os.remove(sink_path)
+
+        t_env.connect(FileSystem().path(sink_path))\
+             .with_format(OldCsv()
+                          .field_delimiter(',')
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .with_schema(Schema()
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .register_table_sink("sink")
+        t_env.scan("source") \
+             .select("a + 1, b, c") \
+             .insert_into("sink")
+        t_env.execute()
+
+        with open(sink_path, 'r') as f:
+            lines = f.read()
+            assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n"
+
+    def test_register_table_source(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        sink_path = os.path.join(self.tempdir + '/streaming2.csv')
+        if os.path.isfile(sink_path):
+            os.remove(sink_path)
+        t_env.register_table_sink(
+            "sink",
+            field_names, field_types, CsvTableSink(sink_path))
+
+        # connect source
+        t_env.connect(FileSystem().path(source_path))\
+             .with_format(OldCsv()
+                          .field_delimiter(',')
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .with_schema(Schema()
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .register_table_source("source")
+        t_env.scan("source") \
+             .select("a + 1, b, c") \
+             .insert_into("sink")
+        t_env.execute()
+
+        with open(sink_path, 'r') as f:
+            lines = f.read()
+            assert lines == '2,Hi,Hello\n' + '3,Hello,Hello\n'
+
+    def test_register_table_source_and_sink(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        self.prepare_csv_source(source_path, data, field_types, field_names)
+        sink_path = os.path.join(self.tempdir + '/streaming2.csv')
+        if os.path.isfile(sink_path):
+            os.remove(sink_path)
+        t_env = self.t_env
+
+        t_env.connect(FileSystem().path(source_path))\
+             .with_format(OldCsv()
+                          .field_delimiter(',')
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .with_schema(Schema()
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .register_table_source_and_sink("source")
+        t_env.connect(FileSystem().path(sink_path))\
+             .with_format(OldCsv()
+                          .field_delimiter(',')
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .with_schema(Schema()
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .register_table_source_and_sink("sink")
+        t_env.scan("source") \
+             .select("a + 1, b, c") \
+             .insert_into("sink")
+        t_env.execute()
+
+        with open(sink_path, 'r') as f:
+            lines = f.read()
+            assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n"
+
+
+class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescriptorTests):
+
+    def test_in_append_mode(self):
+        descriptor = self.t_env.connect(FileSystem())
+
+        descriptor\
+            .with_format(OldCsv())\
+            .in_append_mode()
+
+        properties = descriptor.to_properties()
+        expected = {'update-mode': 'append',
+                    'format.type': 'csv',
+                    'format.property-version': '1',
+                    'connector.property-version': '1',
+                    'connector.type': 'filesystem'}
+        assert properties == expected
+
+    def test_in_retract_mode(self):
+        descriptor = self.t_env.connect(FileSystem())
+
+        descriptor \
+            .with_format(OldCsv()) \
+            .in_retract_mode()
+
+        properties = descriptor.to_properties()
+        expected = {'update-mode': 'retract',
+                    'format.type': 'csv',
+                    'format.property-version': '1',
+                    'connector.property-version': '1',
+                    'connector.type': 'filesystem'}
+        assert properties == expected
+
+    def test_in_upsert_mode(self):
+        descriptor = self.t_env.connect(FileSystem())
+
+        descriptor \
+            .with_format(OldCsv()) \
+            .in_upsert_mode()
+
+        properties = descriptor.to_properties()
+        expected = {'update-mode': 'upsert',
+                    'format.type': 'csv',
+                    'format.property-version': '1',
+                    'connector.property-version': '1',
+                    'connector.type': 'filesystem'}
+        assert properties == expected
+
+
+class BatchTableDescriptorTests(PyFlinkBatchTableTestCase, AbstractTableDescriptorTests):
+    pass
+
+
+class StreamDescriptorEndToEndTests(PyFlinkStreamTableTestCase):
+
+    def test_end_to_end(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        with open(source_path, 'w') as f:
+            lines = 'a,b,c\n' + \
+                    '1,hi,hello\n' + \
+                    '#comments\n' + \
+                    "error line\n" + \
+                    '2,"hi,world!",hello\n'
+            f.write(lines)
+            f.close()
+        sink_path = os.path.join(self.tempdir + '/streaming2.csv')
+        t_env = self.t_env
+        # connect source
+        t_env.connect(FileSystem().path(source_path))\
+             .with_format(OldCsv()
+                          .field_delimiter(',')
+                          .line_delimiter("\n")
+                          .ignore_parse_errors()
+                          .quote_character('"')
+                          .comment_prefix("#")
+                          .ignore_first_line()
+                          .field("a", "INT")
+                          .field("b", "VARCHAR")
+                          .field("c", "VARCHAR"))\
+             .with_schema(Schema()
+                          .field("a", "INT")
+                          .field("b", "VARCHAR")
+                          .field("c", "VARCHAR"))\
+             .in_append_mode()\
+             .register_table_source("source")
+        # connect sink
+        t_env.connect(FileSystem().path(sink_path))\
+             .with_format(OldCsv()
+                          .field_delimiter(',')
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .with_schema(Schema()
+                          .field("a", DataTypes.INT)
+                          .field("b", DataTypes.STRING)
+                          .field("c", DataTypes.STRING))\
+             .register_table_sink("sink")
+
+        t_env.scan("source") \
+             .select("a + 1, b, c") \
+             .insert_into("sink")
+        t_env.execute()
+
+        with open(sink_path, 'r') as f:
+            lines = f.read()
+            assert lines == '2,hi,hello\n' + '3,hi,world!,hello\n'
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index d0bfaf8..c88b985 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,7 +41,7 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
         # registerExternalCatalog, getRegisteredExternalCatalog and listTables
         # should be supported when catalog supported in python.
         # getCompletionHints has been deprecated. It will be removed in the next release.
-        return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'connect',
+        return {'registerExternalCatalog', 'getRegisteredExternalCatalog',
                 'registerFunction', 'listUserDefinedFunctions', 'listTables', 'getCompletionHints'}