You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/05 05:38:18 UTC
[flink] branch master updated: [FLINK-12767][python] Support user
defined connectors/format
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 80d7ba4 [FLINK-12767][python] Support user defined connectors/format
80d7ba4 is described below
commit 80d7ba440bd7389e76536e3b91ca227f4f850a67
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Mon Jun 10 12:20:14 2019 +0800
[FLINK-12767][python] Support user defined connectors/format
This closes #8719
---
flink-python/pom.xml | 6 ++
flink-python/pyflink/java_gateway.py | 2 +
flink-python/pyflink/table/descriptors.py | 112 ++++++++++++++++++++-
.../pyflink/table/tests/test_descriptor.py | 35 ++++++-
.../python/CustomConnectorDescriptor.java | 72 +++++++++++++
.../descriptors/python/CustomFormatDescriptor.java | 71 +++++++++++++
6 files changed, 296 insertions(+), 2 deletions(-)
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 276b890..ad4929f 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -56,6 +56,12 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Python API dependencies -->
diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index bf8ad76..9bb0b62 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -119,6 +119,8 @@ def import_flink_view(gateway):
java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
java_import(gateway.jvm, "org.apache.flink.table.catalog.*")
java_import(gateway.jvm, "org.apache.flink.table.descriptors.*")
+ java_import(gateway.jvm, "org.apache.flink.table.descriptors.python.*")
+ java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.types.*")
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index cb8fc7b..457fc4f 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -24,6 +24,7 @@ from pyflink.table.types import _to_java_type
from pyflink.java_gateway import get_gateway
if sys.version >= '3':
+ long = int
unicode = str
__all__ = [
@@ -39,7 +40,9 @@ __all__ = [
'FileSystem',
'ConnectTableDescriptor',
'StreamTableDescriptor',
- 'BatchTableDescriptor'
+ 'BatchTableDescriptor',
+ 'CustomConnectorDescriptor',
+ 'CustomFormatDescriptor'
]
@@ -611,6 +614,58 @@ class Json(FormatDescriptor):
return self
+class CustomFormatDescriptor(FormatDescriptor):
+ """
+ Describes the custom format of data.
+ """
+
+ def __init__(self, type, version):
+ """
+ Constructs a :class:`CustomFormatDescriptor`.
+
+ :param type: String that identifies this format.
+ :param version: Property version for backwards compatibility.
+ """
+
+ if not isinstance(type, (str, unicode)):
+ raise TypeError("type must be of type str.")
+ if not isinstance(version, (int, long)):
+ raise TypeError("version must be of type int.")
+ gateway = get_gateway()
+ super(CustomFormatDescriptor, self).__init__(
+ gateway.jvm.CustomFormatDescriptor(type, version))
+
+ def property(self, key, value):
+ """
+ Adds a configuration property for the format.
+
+ :param key: The property key to be set.
+ :param value: The property value to be set.
+ :return: This object.
+ """
+
+ if not isinstance(key, (str, unicode)):
+ raise TypeError("key must be of type str.")
+ if not isinstance(value, (str, unicode)):
+ raise TypeError("value must be of type str.")
+ self._j_format_descriptor = self._j_format_descriptor.property(key, value)
+ return self
+
+ def properties(self, property_dict):
+ """
+ Adds a set of properties for the format.
+
+ :param property_dict: The dict object contains configuration properties for the format.
+ Both the keys and values should be strings.
+ :return: This object.
+ """
+
+ if not isinstance(property_dict, dict):
+ raise TypeError("property_dict must be of type dict.")
+ self._j_format_descriptor = self._j_format_descriptor.properties(property_dict)
+ return self
+
+
class ConnectorDescriptor(Descriptor):
"""
Describes a connector to an other system.
@@ -1126,6 +1181,61 @@ class Elasticsearch(ConnectorDescriptor):
return self
+class CustomConnectorDescriptor(ConnectorDescriptor):
+ """
+ Describes a custom connector to an other system.
+ """
+
+ def __init__(self, type, version, format_needed):
+ """
+ Constructs a :class:`CustomConnectorDescriptor`.
+
+ :param type: String that identifies this connector.
+ :param version: Property version for backwards compatibility.
+ :param format_needed: Flag for basic validation of a needed format descriptor.
+ """
+
+ if not isinstance(type, (str, unicode)):
+ raise TypeError("type must be of type str.")
+ if not isinstance(version, (int, long)):
+ raise TypeError("version must be of type int.")
+ if not isinstance(format_needed, bool):
+ raise TypeError("format_needed must be of type bool.")
+ gateway = get_gateway()
+ super(CustomConnectorDescriptor, self).__init__(
+ gateway.jvm.CustomConnectorDescriptor(type, version, format_needed))
+
+ def property(self, key, value):
+ """
+ Adds a configuration property for the connector.
+
+ :param key: The property key to be set.
+ :param value: The property value to be set.
+ :return: This object.
+ """
+
+ if not isinstance(key, (str, unicode)):
+ raise TypeError("key must be of type str.")
+ if not isinstance(value, (str, unicode)):
+ raise TypeError("value must be of type str.")
+ self._j_connector_descriptor = self._j_connector_descriptor.property(key, value)
+ return self
+
+ def properties(self, property_dict):
+ """
+ Adds a set of properties for the connector.
+
+ :param property_dict: The dict object contains configuration properties for the connector.
+ Both the keys and values should be strings.
+ :return: This object.
+ """
+
+ if not isinstance(property_dict, dict):
+ raise TypeError("property_dict must be of type dict.")
+ self._j_connector_descriptor = self._j_connector_descriptor.properties(property_dict)
+ return self
+
+
class ConnectTableDescriptor(Descriptor):
"""
Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`.
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 1f313a7..1c0f36e 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -18,7 +18,8 @@
import os
from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafka,
- Elasticsearch, Csv, Avro, Json)
+ Elasticsearch, Csv, Avro, Json, CustomConnectorDescriptor,
+ CustomFormatDescriptor)
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes
from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
@@ -354,6 +355,22 @@ class ElasticsearchDescriptorTest(PyFlinkTestCase):
self.assertEqual(expected, properties)
+class CustomConnectorDescriptorTests(PyFlinkTestCase):
+
+ def test_custom_connector(self):
+ custom_connector = CustomConnectorDescriptor('kafka', 1, True) \
+ .property('connector.topic', 'topic1')\
+ .properties({'connector.version': '0.11', 'connector.startup-mode': 'earliest-offset'})
+
+ properties = custom_connector.to_properties()
+ expected = {'connector.type': 'kafka',
+ 'connector.property-version': '1',
+ 'connector.topic': 'topic1',
+ 'connector.version': '0.11',
+ 'connector.startup-mode': 'earliest-offset'}
+ self.assertEqual(expected, properties)
+
+
class OldCsvDescriptorTests(PyFlinkTestCase):
def test_field_delimiter(self):
@@ -663,6 +680,22 @@ class JsonDescriptorTests(PyFlinkTestCase):
self.assertEqual(expected, properties)
+class CustomFormatDescriptorTests(PyFlinkTestCase):
+
+ def test_custom_format_descriptor(self):
+ custom_format = CustomFormatDescriptor('json', 1) \
+ .property('format.schema', 'ROW<a INT, b VARCHAR>') \
+ .properties({'format.fail-on-missing-field': 'true'})
+
+ expected = {'format.fail-on-missing-field': 'true',
+ 'format.schema': 'ROW<a INT, b VARCHAR>',
+ 'format.property-version': '1',
+ 'format.type': 'json'}
+
+ properties = custom_format.to_properties()
+ self.assertEqual(expected, properties)
+
+
class RowTimeDescriptorTests(PyFlinkTestCase):
def test_timestamps_from_field(self):
diff --git a/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java
new file mode 100644
index 0000000..34a3c71
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Map;
+
+/**
+ * Describes a custom connector to an other system.
+ */
+@Internal
+public class CustomConnectorDescriptor extends ConnectorDescriptor {
+
+ private final DescriptorProperties properties;
+
+ /**
+ * Constructs a {@link CustomConnectorDescriptor}.
+ *
+ * @param type String that identifies this connector.
+ * @param version Property version for backwards compatibility.
+ * @param formatNeeded Flag for basic validation of a needed format descriptor.
+ */
+ public CustomConnectorDescriptor(String type, int version, boolean formatNeeded) {
+ super(type, version, formatNeeded);
+ properties = new DescriptorProperties();
+ }
+
+ /**
+ * Adds a configuration property for the connector.
+ *
+ * @param key The property key to be set.
+ * @param value The property value to be set.
+ */
+ public CustomConnectorDescriptor property(String key, String value) {
+ properties.putString(key, value);
+ return this;
+ }
+
+ /**
+ * Adds a set of properties for the connector.
+ *
+ * @param properties The properties to add.
+ */
+ public CustomConnectorDescriptor properties(Map<String, String> properties) {
+ this.properties.putProperties(properties);
+ return this;
+ }
+
+ @Override
+ protected Map<String, String> toConnectorProperties() {
+ return properties.asMap();
+ }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java
new file mode 100644
index 0000000..0ea17dc
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+
+import java.util.Map;
+
+/**
+ * Describes the custom format of data.
+ */
+@Internal
+public class CustomFormatDescriptor extends FormatDescriptor {
+
+ private final DescriptorProperties properties;
+
+ /**
+ * Constructs a {@link CustomFormatDescriptor}.
+ *
+ * @param type String that identifies this format.
+ * @param version Property version for backwards compatibility.
+ */
+ public CustomFormatDescriptor(String type, int version) {
+ super(type, version);
+ properties = new DescriptorProperties();
+ }
+
+ /**
+ * Adds a configuration property for the format.
+ *
+ * @param key The property key to be set.
+ * @param value The property value to be set.
+ */
+ public CustomFormatDescriptor property(String key, String value) {
+ properties.putString(key, value);
+ return this;
+ }
+
+ /**
+ * Adds a set of properties for the format.
+ *
+ * @param properties The properties to add.
+ */
+ public CustomFormatDescriptor properties(Map<String, String> properties) {
+ this.properties.putProperties(properties);
+ return this;
+ }
+
+ @Override
+ protected Map<String, String> toFormatProperties() {
+ return properties.asMap();
+ }
+}