You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/05 05:38:18 UTC

[flink] branch master updated: [FLINK-12767][python] Support user defined connectors/format

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 80d7ba4  [FLINK-12767][python] Support user defined connectors/format
80d7ba4 is described below

commit 80d7ba440bd7389e76536e3b91ca227f4f850a67
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Mon Jun 10 12:20:14 2019 +0800

    [FLINK-12767][python] Support user defined connectors/format
    
    This closes #8719
---
 flink-python/pom.xml                               |   6 ++
 flink-python/pyflink/java_gateway.py               |   2 +
 flink-python/pyflink/table/descriptors.py          | 112 ++++++++++++++++++++-
 .../pyflink/table/tests/test_descriptor.py         |  35 ++++++-
 .../python/CustomConnectorDescriptor.java          |  72 +++++++++++++
 .../descriptors/python/CustomFormatDescriptor.java |  71 +++++++++++++
 6 files changed, 296 insertions(+), 2 deletions(-)

diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 276b890..ad4929f 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -56,6 +56,12 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
 
 		<!-- Python API dependencies -->
 
diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index bf8ad76..9bb0b62 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -119,6 +119,8 @@ def import_flink_view(gateway):
     java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
     java_import(gateway.jvm, "org.apache.flink.table.catalog.*")
     java_import(gateway.jvm, "org.apache.flink.table.descriptors.*")
+    java_import(gateway.jvm, "org.apache.flink.table.descriptors.python.*")
+    java_import(gateway.jvm, "org.apache.flink.table.sources.*")
     java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
     java_import(gateway.jvm, "org.apache.flink.table.sources.*")
     java_import(gateway.jvm, "org.apache.flink.table.types.*")
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index cb8fc7b..457fc4f 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -24,6 +24,7 @@ from pyflink.table.types import _to_java_type
 from pyflink.java_gateway import get_gateway
 
 if sys.version >= '3':
+    long = int
     unicode = str
 
 __all__ = [
@@ -39,7 +40,9 @@ __all__ = [
     'FileSystem',
     'ConnectTableDescriptor',
     'StreamTableDescriptor',
-    'BatchTableDescriptor'
+    'BatchTableDescriptor',
+    'CustomConnectorDescriptor',
+    'CustomFormatDescriptor'
 ]
 
 
@@ -611,6 +614,58 @@ class Json(FormatDescriptor):
         return self
 
 
+class CustomFormatDescriptor(FormatDescriptor):
+    """
+    Describes the custom format of data.
+    """
+
+    def __init__(self, type, version):
+        """
+        Constructs a :class:`CustomFormatDescriptor`.
+
+        :param type: String that identifies this format.
+        :param version: Property version for backwards compatibility.
+        """
+
+        if not isinstance(type, (str, unicode)):
+            raise TypeError("type must be of type str.")
+        if not isinstance(version, (int, long)):
+            raise TypeError("version must be of type int.")
+        gateway = get_gateway()
+        super(CustomFormatDescriptor, self).__init__(
+            gateway.jvm.CustomFormatDescriptor(type, version))
+
+    def property(self, key, value):
+        """
+        Adds a configuration property for the format.
+
+        :param key: The property key to be set.
+        :param value: The property value to be set.
+        :return: This object.
+        """
+
+        if not isinstance(key, (str, unicode)):
+            raise TypeError("key must be of type str.")
+        if not isinstance(value, (str, unicode)):
+            raise TypeError("value must be of type str.")
+        self._j_format_descriptor = self._j_format_descriptor.property(key, value)
+        return self
+
+    def properties(self, property_dict):
+        """
+        Adds a set of properties for the format.
+
+        :param property_dict: The dict object contains configuration properties for the format.
+                              Both the keys and values should be strings.
+        :return: This object.
+        """
+
+        if not isinstance(property_dict, dict):
+            raise TypeError("property_dict must be of type dict.")
+        self._j_format_descriptor = self._j_format_descriptor.properties(property_dict)
+        return self
+
+
 class ConnectorDescriptor(Descriptor):
     """
     Describes a connector to an other system.
@@ -1126,6 +1181,61 @@ class Elasticsearch(ConnectorDescriptor):
         return self
 
 
+class CustomConnectorDescriptor(ConnectorDescriptor):
+    """
+    Describes a custom connector to an other system.
+    """
+
+    def __init__(self, type, version, format_needed):
+        """
+        Constructs a :class:`CustomConnectorDescriptor`.
+
+        :param type: String that identifies this connector.
+        :param version: Property version for backwards compatibility.
+        :param format_needed: Flag for basic validation of a needed format descriptor.
+        """
+
+        if not isinstance(type, (str, unicode)):
+            raise TypeError("type must be of type str.")
+        if not isinstance(version, (int, long)):
+            raise TypeError("version must be of type int.")
+        if not isinstance(format_needed, bool):
+            raise TypeError("format_needed must be of type bool.")
+        gateway = get_gateway()
+        super(CustomConnectorDescriptor, self).__init__(
+            gateway.jvm.CustomConnectorDescriptor(type, version, format_needed))
+
+    def property(self, key, value):
+        """
+        Adds a configuration property for the connector.
+
+        :param key: The property key to be set.
+        :param value: The property value to be set.
+        :return: This object.
+        """
+
+        if not isinstance(key, (str, unicode)):
+            raise TypeError("key must be of type str.")
+        if not isinstance(value, (str, unicode)):
+            raise TypeError("value must be of type str.")
+        self._j_connector_descriptor = self._j_connector_descriptor.property(key, value)
+        return self
+
+    def properties(self, property_dict):
+        """
+        Adds a set of properties for the connector.
+
+        :param property_dict: The dict object contains configuration properties for the connector.
+                              Both the keys and values should be strings.
+        :return: This object.
+        """
+
+        if not isinstance(property_dict, dict):
+            raise TypeError("property_dict must be of type dict.")
+        self._j_connector_descriptor = self._j_connector_descriptor.properties(property_dict)
+        return self
+
+
 class ConnectTableDescriptor(Descriptor):
     """
     Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`.
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 1f313a7..1c0f36e 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -18,7 +18,8 @@
 import os
 
 from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafka,
-                                       Elasticsearch, Csv, Avro, Json)
+                                       Elasticsearch, Csv, Avro, Json, CustomConnectorDescriptor,
+                                       CustomFormatDescriptor)
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import DataTypes
 from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
@@ -354,6 +355,22 @@ class ElasticsearchDescriptorTest(PyFlinkTestCase):
         self.assertEqual(expected, properties)
 
 
+class CustomConnectorDescriptorTests(PyFlinkTestCase):
+
+    def test_custom_connector(self):
+        custom_connector = CustomConnectorDescriptor('kafka', 1, True) \
+            .property('connector.topic', 'topic1')\
+            .properties({'connector.version': '0.11', 'connector.startup-mode': 'earliest-offset'})
+
+        properties = custom_connector.to_properties()
+        expected = {'connector.type': 'kafka',
+                    'connector.property-version': '1',
+                    'connector.topic': 'topic1',
+                    'connector.version': '0.11',
+                    'connector.startup-mode': 'earliest-offset'}
+        self.assertEqual(expected, properties)
+
+
 class OldCsvDescriptorTests(PyFlinkTestCase):
 
     def test_field_delimiter(self):
@@ -663,6 +680,22 @@ class JsonDescriptorTests(PyFlinkTestCase):
         self.assertEqual(expected, properties)
 
 
+class CustomFormatDescriptorTests(PyFlinkTestCase):
+
+    def test_custom_format_descriptor(self):
+        custom_format = CustomFormatDescriptor('json', 1) \
+            .property('format.schema', 'ROW<a INT, b VARCHAR>') \
+            .properties({'format.fail-on-missing-field': 'true'})
+
+        expected = {'format.fail-on-missing-field': 'true',
+                    'format.schema': 'ROW<a INT, b VARCHAR>',
+                    'format.property-version': '1',
+                    'format.type': 'json'}
+
+        properties = custom_format.to_properties()
+        self.assertEqual(expected, properties)
+
+
 class RowTimeDescriptorTests(PyFlinkTestCase):
 
     def test_timestamps_from_field(self):
diff --git a/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java
new file mode 100644
index 0000000..34a3c71
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Map;
+
+/**
+ * Describes a custom connector to an other system.
+ */
+@Internal
+public class CustomConnectorDescriptor extends ConnectorDescriptor {
+
+	private final DescriptorProperties properties;
+
+	/**
+	 * Constructs a {@link CustomConnectorDescriptor}.
+	 *
+	 * @param type String that identifies this connector.
+	 * @param version Property version for backwards compatibility.
+	 * @param formatNeeded Flag for basic validation of a needed format descriptor.
+	 */
+	public CustomConnectorDescriptor(String type, int version, boolean formatNeeded) {
+		super(type, version, formatNeeded);
+		properties = new DescriptorProperties();
+	}
+
+	/**
+	 * Adds a configuration property for the connector.
+	 *
+	 * @param key The property key to be set.
+	 * @param value The property value to be set.
+	 */
+	public CustomConnectorDescriptor property(String key, String value) {
+		properties.putString(key, value);
+		return this;
+	}
+
+	/**
+	 * Adds a set of properties for the connector.
+	 *
+	 * @param properties The properties to add.
+	 */
+	public CustomConnectorDescriptor properties(Map<String, String> properties) {
+		this.properties.putProperties(properties);
+		return this;
+	}
+
+	@Override
+	protected Map<String, String> toConnectorProperties() {
+		return properties.asMap();
+	}
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java
new file mode 100644
index 0000000..0ea17dc
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+
+import java.util.Map;
+
+/**
+ * Describes the custom format of data.
+ */
+@Internal
+public class CustomFormatDescriptor extends FormatDescriptor {
+
+	private final DescriptorProperties properties;
+
+	/**
+	 * Constructs a {@link CustomFormatDescriptor}.
+	 *
+	 * @param type String that identifies this format.
+	 * @param version Property version for backwards compatibility.
+	 */
+	public CustomFormatDescriptor(String type, int version) {
+		super(type, version);
+		properties = new DescriptorProperties();
+	}
+
+	/**
+	 * Adds a configuration property for the format.
+	 *
+	 * @param key The property key to be set.
+	 * @param value The property value to be set.
+	 */
+	public CustomFormatDescriptor property(String key, String value) {
+		properties.putString(key, value);
+		return this;
+	}
+
+	/**
+	 * Adds a set of properties for the format.
+	 *
+	 * @param properties The properties to add.
+	 */
+	public CustomFormatDescriptor properties(Map<String, String> properties) {
+		this.properties.putProperties(properties);
+		return this;
+	}
+
+	@Override
+	protected Map<String, String> toFormatProperties() {
+		return properties.asMap();
+	}
+}