You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/21 10:01:02 UTC

[GitHub] [flink] HuangXingBo commented on a change in pull request #18388: [FLINK-25530][python][connector/pulsar] Support Pulsar source connector in Python DataStream API.

HuangXingBo commented on a change in pull request #18388:
URL: https://github.com/apache/flink/pull/18388#discussion_r789493255



##########
File path: flink-python/pyflink/datastream/connectors.py
##########
@@ -1079,6 +1085,320 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
 
+class PulsarDeserializationSchema(object):
+    """
+    A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
+    support both the pulsar's self managed schema and flink managed schema.
+    """
+
+    def __init__(self, _j_pulsar_deserialization_schema):
+        self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
+
+    @staticmethod
+    def flink_schema(deserialization_schema: DeserializationSchema) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
+        consume the pulsar message as byte array and decode the message by using flink's logic.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
+            deserialization_schema._j_deserialization_schema)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+    @staticmethod
+    def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
+        only used for treating message that was written into pulsar by TypeInformation.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_execution_config = execution_config._j_execution_config \
+            if execution_config is not None else None
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
+            type_information.get_java_type_info(), _j_execution_config)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+
+class SubscriptionType(Enum):
+    """
+    Types of subscription supported by Pulsar.
+
+    :data: `Exclusive`:
+
+    There can be only 1 consumer on the same topic with the same subscription name.
+
+    :data: `Shared`:
+
+    Multiple consumer will be able to use the same subscription name and the messages will be
+    dispatched according to a round-robin rotation between the connected consumers. In this mode,
+    the consumption order is not guaranteed.
+
+    :data: `Failover`:
+
+    Multiple consumer will be able to use the same subscription name but only 1 consumer will
+    receive the messages. If that consumer disconnects, one of the other connected consumers will
+    start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
+    partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
+    assignments will be split across the available consumers. On each partition, at most one
+    consumer will be active at a given point in time.
+
+    :data: `Key_Shared`:
+
+    Multiple consumer will be able to use the same subscription and all messages with the same key
+    will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
+    message ordering.
+    """
+
+    Exclusive = 0,
+    Shared = 1,
+    Failover = 2,
+    Key_Shared = 3
+
+    def _to_j_subscription_type(self):
+        JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
+        return getattr(JSubscriptionType, self.name)
+
+
+class StartCursor(object):
+    """
+    A interface for users to specify the start position of a pulsar subscription.
+    Since it would be serialized into split.
+    The implementation for this interface should be well considered.
+    I don't recommend adding extra internal state for this implementation.
+
+    This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
+    """
+
+    def __init__(self, _j_start_cursor):
+        self._j_start_cursor = _j_start_cursor
+
+    @staticmethod
+    def default_start_cursor() -> 'StartCursor':
+        return StartCursor.earliest()
+
+    @staticmethod
+    def earliest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.earliest())
+
+    @staticmethod
+    def latest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.latest())
+
+    @staticmethod
+    def from_message_time(timestamp: int) -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.fromMessageTime(timestamp))
+
+
+class StopCursor(object):
+    """
+    A interface for users to specify the stop position of a pulsar subscription. Since it would be
+    serialized into split. The implementation for this interface should be well considered. I don't
+    recommend adding extra internal state for this implementation.
+    """
+
+    def __init__(self, _j_stop_cursor):
+        self._j_stop_cursor = _j_stop_cursor
+
+    @staticmethod
+    def default_stop_cursor() -> 'StopCursor':
+        return StopCursor.never()
+
+    @staticmethod
+    def never() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.never())
+
+    @staticmethod
+    def latest() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.latest())
+
+    @staticmethod
+    def at_event_time(timestamp: int) -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.atEventTime(timestamp))
+
+
+class PulsarSource(Source):
+    """
+    The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a
+    PulsarSource.

Review comment:
       I think you can add the usage example from java PulsarSource in the annotation 

##########
File path: flink-python/pyflink/datastream/connectors.py
##########
@@ -1079,6 +1085,320 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
 
+class PulsarDeserializationSchema(object):
+    """
+    A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
+    support both the pulsar's self managed schema and flink managed schema.
+    """
+
+    def __init__(self, _j_pulsar_deserialization_schema):
+        self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
+
+    @staticmethod
+    def flink_schema(deserialization_schema: DeserializationSchema) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
+        consume the pulsar message as byte array and decode the message by using flink's logic.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
+            deserialization_schema._j_deserialization_schema)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+    @staticmethod
+    def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
+        only used for treating message that was written into pulsar by TypeInformation.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_execution_config = execution_config._j_execution_config \
+            if execution_config is not None else None
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
+            type_information.get_java_type_info(), _j_execution_config)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+
+class SubscriptionType(Enum):
+    """
+    Types of subscription supported by Pulsar.
+
+    :data: `Exclusive`:
+
+    There can be only 1 consumer on the same topic with the same subscription name.
+
+    :data: `Shared`:
+
+    Multiple consumer will be able to use the same subscription name and the messages will be
+    dispatched according to a round-robin rotation between the connected consumers. In this mode,
+    the consumption order is not guaranteed.
+
+    :data: `Failover`:
+
+    Multiple consumer will be able to use the same subscription name but only 1 consumer will
+    receive the messages. If that consumer disconnects, one of the other connected consumers will
+    start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
+    partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
+    assignments will be split across the available consumers. On each partition, at most one
+    consumer will be active at a given point in time.
+
+    :data: `Key_Shared`:
+
+    Multiple consumer will be able to use the same subscription and all messages with the same key
+    will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
+    message ordering.
+    """
+
+    Exclusive = 0,
+    Shared = 1,
+    Failover = 2,
+    Key_Shared = 3
+
+    def _to_j_subscription_type(self):
+        JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
+        return getattr(JSubscriptionType, self.name)
+
+
+class StartCursor(object):
+    """
+    A interface for users to specify the start position of a pulsar subscription.

Review comment:
       ```suggestion
       A factory class for users to specify the start position of a pulsar subscription.
   ```

##########
File path: flink-python/pyflink/datastream/connectors.py
##########
@@ -1079,6 +1085,320 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
 
+class PulsarDeserializationSchema(object):
+    """
+    A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
+    support both the pulsar's self managed schema and flink managed schema.
+    """
+
+    def __init__(self, _j_pulsar_deserialization_schema):
+        self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
+
+    @staticmethod
+    def flink_schema(deserialization_schema: DeserializationSchema) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
+        consume the pulsar message as byte array and decode the message by using flink's logic.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
+            deserialization_schema._j_deserialization_schema)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+    @staticmethod
+    def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
+        only used for treating message that was written into pulsar by TypeInformation.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_execution_config = execution_config._j_execution_config \
+            if execution_config is not None else None
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
+            type_information.get_java_type_info(), _j_execution_config)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+
+class SubscriptionType(Enum):
+    """
+    Types of subscription supported by Pulsar.
+
+    :data: `Exclusive`:
+
+    There can be only 1 consumer on the same topic with the same subscription name.
+
+    :data: `Shared`:
+
+    Multiple consumer will be able to use the same subscription name and the messages will be
+    dispatched according to a round-robin rotation between the connected consumers. In this mode,
+    the consumption order is not guaranteed.
+
+    :data: `Failover`:
+
+    Multiple consumer will be able to use the same subscription name but only 1 consumer will
+    receive the messages. If that consumer disconnects, one of the other connected consumers will
+    start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
+    partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
+    assignments will be split across the available consumers. On each partition, at most one
+    consumer will be active at a given point in time.
+
+    :data: `Key_Shared`:
+
+    Multiple consumer will be able to use the same subscription and all messages with the same key
+    will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
+    message ordering.
+    """
+
+    Exclusive = 0,
+    Shared = 1,
+    Failover = 2,
+    Key_Shared = 3
+
+    def _to_j_subscription_type(self):
+        JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
+        return getattr(JSubscriptionType, self.name)
+
+
+class StartCursor(object):
+    """
+    A interface for users to specify the start position of a pulsar subscription.
+    Since it would be serialized into split.
+    The implementation for this interface should be well considered.
+    I don't recommend adding extra internal state for this implementation.
+
+    This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
+    """
+
+    def __init__(self, _j_start_cursor):
+        self._j_start_cursor = _j_start_cursor
+
+    @staticmethod
+    def default_start_cursor() -> 'StartCursor':
+        return StartCursor.earliest()
+
+    @staticmethod
+    def earliest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.earliest())
+
+    @staticmethod
+    def latest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.latest())
+
+    @staticmethod
+    def from_message_time(timestamp: int) -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.fromMessageTime(timestamp))
+
+
+class StopCursor(object):
+    """
+    A interface for users to specify the stop position of a pulsar subscription. Since it would be
+    serialized into split. The implementation for this interface should be well considered. I don't
+    recommend adding extra internal state for this implementation.
+    """
+
+    def __init__(self, _j_stop_cursor):
+        self._j_stop_cursor = _j_stop_cursor
+
+    @staticmethod
+    def default_stop_cursor() -> 'StopCursor':
+        return StopCursor.never()
+
+    @staticmethod
+    def never() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.never())
+
+    @staticmethod
+    def latest() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.latest())
+
+    @staticmethod
+    def at_event_time(timestamp: int) -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.atEventTime(timestamp))
+
+
+class PulsarSource(Source):
+    """
+    The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a
+    PulsarSource.
+    """
+
+    def __init__(self, j_pulsar_source):
+        super(PulsarSource, self).__init__(source=j_pulsar_source)
+
+    @staticmethod
+    def builder() -> 'PulsarSourceBuilder':
+        """
+        Get a PulsarSourceBuilder to builder a PulsarSource.
+        """
+        return PulsarSourceBuilder()
+
+
+class PulsarSourceBuilder(object):
+    """
+    The builder class for PulsarSource to make it easier for the users to construct a PulsarSource.
+
+    The service url, admin url, subscription name, topics to consume, and the record deserializer
+    are required fields that must be set.
+
+    To specify the starting position of PulsarSource, one can call set_start_cursor(StartCursor).
+
+    By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED mode and never stop
+    until the Flink job is canceled or fails. To let the PulsarSource run in
+    Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call
+    set_unbounded_stop_cursor(StopCursor).

Review comment:
       dito

##########
File path: flink-python/pyflink/datastream/connectors.py
##########
@@ -1079,6 +1085,320 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
 
+class PulsarDeserializationSchema(object):
+    """
+    A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
+    support both the pulsar's self managed schema and flink managed schema.
+    """
+
+    def __init__(self, _j_pulsar_deserialization_schema):
+        self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
+
+    @staticmethod
+    def flink_schema(deserialization_schema: DeserializationSchema) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
+        consume the pulsar message as byte array and decode the message by using flink's logic.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
+            deserialization_schema._j_deserialization_schema)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+    @staticmethod
+    def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
+        only used for treating message that was written into pulsar by TypeInformation.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_execution_config = execution_config._j_execution_config \
+            if execution_config is not None else None
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
+            type_information.get_java_type_info(), _j_execution_config)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+
+class SubscriptionType(Enum):
+    """
+    Types of subscription supported by Pulsar.
+
+    :data: `Exclusive`:
+
+    There can be only 1 consumer on the same topic with the same subscription name.
+
+    :data: `Shared`:
+
+    Multiple consumer will be able to use the same subscription name and the messages will be
+    dispatched according to a round-robin rotation between the connected consumers. In this mode,
+    the consumption order is not guaranteed.
+
+    :data: `Failover`:
+
+    Multiple consumer will be able to use the same subscription name but only 1 consumer will
+    receive the messages. If that consumer disconnects, one of the other connected consumers will
+    start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
+    partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
+    assignments will be split across the available consumers. On each partition, at most one
+    consumer will be active at a given point in time.
+
+    :data: `Key_Shared`:
+
+    Multiple consumer will be able to use the same subscription and all messages with the same key
+    will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
+    message ordering.
+    """
+
+    Exclusive = 0,
+    Shared = 1,
+    Failover = 2,
+    Key_Shared = 3
+
+    def _to_j_subscription_type(self):
+        JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
+        return getattr(JSubscriptionType, self.name)
+
+
+class StartCursor(object):
+    """
+    A interface for users to specify the start position of a pulsar subscription.
+    Since it would be serialized into split.
+    The implementation for this interface should be well considered.
+    I don't recommend adding extra internal state for this implementation.
+
+    This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
+    """
+
+    def __init__(self, _j_start_cursor):
+        self._j_start_cursor = _j_start_cursor
+
+    @staticmethod
+    def default_start_cursor() -> 'StartCursor':
+        return StartCursor.earliest()
+
+    @staticmethod
+    def earliest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.earliest())
+
+    @staticmethod
+    def latest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.latest())
+
+    @staticmethod
+    def from_message_time(timestamp: int) -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.fromMessageTime(timestamp))
+
+
+class StopCursor(object):
+    """
+    A interface for users to specify the stop position of a pulsar subscription. Since it would be

Review comment:
       ```suggestion
       A factory class for users to specify the stop position of a pulsar subscription. Since it would be
   ```

##########
File path: flink-python/pyflink/datastream/tests/test_connectors.py
##########
@@ -149,6 +150,80 @@ def tearDown(self):
             get_gateway().jvm.Thread.currentThread().setContextClassLoader(self._cxt_clz_loader)
 
 
+class FlinkPulsarTest(PyFlinkTestCase):
+
+    def setUp(self) -> None:
+        self.env = StreamExecutionEnvironment.get_execution_environment()
+        self.env.set_parallelism(2)
+        # Cache current ContextClassLoader, we will replace it with a temporary URLClassLoader to
+        # load specific connector jars with given module path to do dependency isolation. And We
+        # will change the ClassLoader back to the cached ContextClassLoader after the test case
+        # finished.
+        self._cxt_clz_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+        _load_specific_flink_module_jars('/flink-connectors/flink-sql-connector-pulsar')

Review comment:
       What about extracting a base class and other connector class extending this base class to avoid these duplicate logic?

##########
File path: flink-connectors/flink-sql-connector-pulsar/pom.xml
##########
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<artifactId>flink-connectors</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.15-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-sql-connector-pulsar</artifactId>
+	<name>Flink : Connectors : SQL : Pulsar</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-pulsar</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes>
+									<include>org.apache.flink:flink-connector-base</include>
+									<include>org.apache.flink:flink-connector-pulsar</include>
+									<include>org.apache.pulsar:*</include>
+									<include>org.slf4j:jul-to-slf4j</include>
+								</includes>
+							</artifactSet>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+

Review comment:
       ```suggestion
   ```

##########
File path: flink-python/pyflink/datastream/connectors.py
##########
@@ -1079,6 +1085,320 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
 
+class PulsarDeserializationSchema(object):
+    """
+    A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
+    support both the pulsar's self managed schema and flink managed schema.
+    """
+
+    def __init__(self, _j_pulsar_deserialization_schema):
+        self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
+
+    @staticmethod
+    def flink_schema(deserialization_schema: DeserializationSchema) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
+        consume the pulsar message as byte array and decode the message by using flink's logic.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
+            deserialization_schema._j_deserialization_schema)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+    @staticmethod
+    def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
+        only used for treating message that was written into pulsar by TypeInformation.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_execution_config = execution_config._j_execution_config \
+            if execution_config is not None else None
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
+            type_information.get_java_type_info(), _j_execution_config)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+
+class SubscriptionType(Enum):
+    """
+    Types of subscription supported by Pulsar.
+
+    :data: `Exclusive`:
+
+    There can be only 1 consumer on the same topic with the same subscription name.
+
+    :data: `Shared`:
+
+    Multiple consumer will be able to use the same subscription name and the messages will be
+    dispatched according to a round-robin rotation between the connected consumers. In this mode,
+    the consumption order is not guaranteed.
+
+    :data: `Failover`:
+
+    Multiple consumer will be able to use the same subscription name but only 1 consumer will
+    receive the messages. If that consumer disconnects, one of the other connected consumers will
+    start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
+    partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
+    assignments will be split across the available consumers. On each partition, at most one
+    consumer will be active at a given point in time.
+
+    :data: `Key_Shared`:
+
+    Multiple consumer will be able to use the same subscription and all messages with the same key
+    will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
+    message ordering.
+    """
+
+    Exclusive = 0,
+    Shared = 1,
+    Failover = 2,
+    Key_Shared = 3
+
+    def _to_j_subscription_type(self):
+        JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
+        return getattr(JSubscriptionType, self.name)
+
+
+class StartCursor(object):
+    """
+    A interface for users to specify the start position of a pulsar subscription.
+    Since it would be serialized into split.
+    The implementation for this interface should be well considered.
+    I don't recommend adding extra internal state for this implementation.
+
+    This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
+    """
+
+    def __init__(self, _j_start_cursor):
+        self._j_start_cursor = _j_start_cursor
+
+    @staticmethod
+    def default_start_cursor() -> 'StartCursor':
+        return StartCursor.earliest()
+
+    @staticmethod
+    def earliest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.earliest())
+
+    @staticmethod
+    def latest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.latest())
+
+    @staticmethod
+    def from_message_time(timestamp: int) -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.fromMessageTime(timestamp))
+
+
+class StopCursor(object):
+    """
+    A interface for users to specify the stop position of a pulsar subscription. Since it would be
+    serialized into split. The implementation for this interface should be well considered. I don't
+    recommend adding extra internal state for this implementation.
+    """
+
+    def __init__(self, _j_stop_cursor):
+        self._j_stop_cursor = _j_stop_cursor
+
+    @staticmethod
+    def default_stop_cursor() -> 'StopCursor':
+        return StopCursor.never()
+
+    @staticmethod
+    def never() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.never())
+
+    @staticmethod
+    def latest() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.latest())
+
+    @staticmethod
+    def at_event_time(timestamp: int) -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.atEventTime(timestamp))
+
+
+class PulsarSource(Source):
+    """
+    The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a
+    PulsarSource.
+    """
+
+    def __init__(self, j_pulsar_source):
+        super(PulsarSource, self).__init__(source=j_pulsar_source)
+
+    @staticmethod
+    def builder() -> 'PulsarSourceBuilder':
+        """
+        Get a PulsarSourceBuilder to builder a PulsarSource.
+        """
+        return PulsarSourceBuilder()
+
+
+class PulsarSourceBuilder(object):
+    """
+    The builder class for PulsarSource to make it easier for the users to construct a PulsarSource.
+
+    The service url, admin url, subscription name, topics to consume, and the record deserializer
+    are required fields that must be set.
+
+    To specify the starting position of PulsarSource, one can call set_start_cursor(StartCursor).
+
+    By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED mode and never stop
+    until the Flink job is canceled or fails. To let the PulsarSource run in
+    Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call
+    set_unbounded_stop_cursor(StopCursor).
+    """
+
+    def __init__(self):
+        JPulsarSource = \
+            get_gateway().jvm.org.apache.flink.connector.pulsar.source.PulsarSource
+        self._j_pulsar_source_builder = JPulsarSource.builder()
+
+    def set_admin_url(self, admin_url: str) -> 'PulsarSourceBuilder':
+        """
+        Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+        """
+        self._j_pulsar_source_builder.setAdminUrl(admin_url)
+        return self
+
+    def set_service_url(self, service_url: str) -> 'PulsarSourceBuilder':
+        """
+        Sets the server's link for the PulsarConsumer of the PulsarSource.
+        """
+        self._j_pulsar_source_builder.setServiceUrl(service_url)
+        return self
+
+    def set_subscription_name(self, subscription_name: str) -> 'PulsarSourceBuilder':
+        """
+        Sets the name for this pulsar subscription.
+        """
+        self._j_pulsar_source_builder.setSubscriptionName(subscription_name)
+        return self
+
+    def set_subscription_type(self, subscription_type: SubscriptionType) -> 'PulsarSourceBuilder':
+        """
+        SubscriptionType is the consuming behavior for pulsar, we would generator different split
+        by the given subscription type. Please take some time to consider which subscription type
+        matches your application best. Default is SubscriptionType.Shared.
+        """
+        self._j_pulsar_source_builder.setSubscriptionType(
+            subscription_type._to_j_subscription_type())
+        return self
+
+    def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder':

Review comment:
       Adds a test that the input argument type is a List[String] ?

##########
File path: flink-python/pyflink/datastream/connectors.py
##########
@@ -1079,6 +1085,320 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
 
+class PulsarDeserializationSchema(object):
+    """
+    A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
+    support both the pulsar's self managed schema and flink managed schema.
+    """
+
+    def __init__(self, _j_pulsar_deserialization_schema):
+        self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
+
+    @staticmethod
+    def flink_schema(deserialization_schema: DeserializationSchema) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
+        consume the pulsar message as byte array and decode the message by using flink's logic.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
+            deserialization_schema._j_deserialization_schema)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+    @staticmethod
+    def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
+        only used for treating message that was written into pulsar by TypeInformation.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_execution_config = execution_config._j_execution_config \
+            if execution_config is not None else None
+        _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
+            type_information.get_java_type_info(), _j_execution_config)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+
+class SubscriptionType(Enum):
+    """
+    Types of subscription supported by Pulsar.
+
+    :data: `Exclusive`:
+
+    There can be only 1 consumer on the same topic with the same subscription name.
+
+    :data: `Shared`:
+
+    Multiple consumer will be able to use the same subscription name and the messages will be
+    dispatched according to a round-robin rotation between the connected consumers. In this mode,
+    the consumption order is not guaranteed.
+
+    :data: `Failover`:
+
+    Multiple consumer will be able to use the same subscription name but only 1 consumer will
+    receive the messages. If that consumer disconnects, one of the other connected consumers will
+    start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
+    partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
+    assignments will be split across the available consumers. On each partition, at most one
+    consumer will be active at a given point in time.
+
+    :data: `Key_Shared`:
+
+    Multiple consumer will be able to use the same subscription and all messages with the same key
+    will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
+    message ordering.
+    """
+
+    Exclusive = 0,
+    Shared = 1,
+    Failover = 2,
+    Key_Shared = 3
+
+    def _to_j_subscription_type(self):
+        JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
+        return getattr(JSubscriptionType, self.name)
+
+
+class StartCursor(object):
+    """
+    A interface for users to specify the start position of a pulsar subscription.
+    Since it would be serialized into split.
+    The implementation for this interface should be well considered.
+    I don't recommend adding extra internal state for this implementation.
+
+    This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
+    """
+
+    def __init__(self, _j_start_cursor):
+        self._j_start_cursor = _j_start_cursor
+
+    @staticmethod
+    def default_start_cursor() -> 'StartCursor':
+        return StartCursor.earliest()
+
+    @staticmethod
+    def earliest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.earliest())
+
+    @staticmethod
+    def latest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.latest())
+
+    @staticmethod
+    def from_message_time(timestamp: int) -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.fromMessageTime(timestamp))
+
+
+class StopCursor(object):
+    """
+    A interface for users to specify the stop position of a pulsar subscription. Since it would be
+    serialized into split. The implementation for this interface should be well considered. I don't
+    recommend adding extra internal state for this implementation.
+    """
+
+    def __init__(self, _j_stop_cursor):
+        self._j_stop_cursor = _j_stop_cursor
+
+    @staticmethod
+    def default_stop_cursor() -> 'StopCursor':
+        return StopCursor.never()
+
+    @staticmethod
+    def never() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.never())
+
+    @staticmethod
+    def latest() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.latest())
+
+    @staticmethod
+    def at_event_time(timestamp: int) -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.atEventTime(timestamp))
+
+
+class PulsarSource(Source):
+    """
+    The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a
+    PulsarSource.
+    """
+
+    def __init__(self, j_pulsar_source):
+        super(PulsarSource, self).__init__(source=j_pulsar_source)
+
+    @staticmethod
+    def builder() -> 'PulsarSourceBuilder':
+        """
+        Get a PulsarSourceBuilder to builder a PulsarSource.
+        """
+        return PulsarSourceBuilder()
+
+
+class PulsarSourceBuilder(object):
+    """
+    The builder class for PulsarSource to make it easier for the users to construct a PulsarSource.
+
+    The service url, admin url, subscription name, topics to consume, and the record deserializer
+    are required fields that must be set.
+
+    To specify the starting position of PulsarSource, one can call set_start_cursor(StartCursor).
+
+    By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED mode and never stop
+    until the Flink job is canceled or fails. To let the PulsarSource run in
+    Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call
+    set_unbounded_stop_cursor(StopCursor).
+    """
+
+    def __init__(self):
+        JPulsarSource = \
+            get_gateway().jvm.org.apache.flink.connector.pulsar.source.PulsarSource
+        self._j_pulsar_source_builder = JPulsarSource.builder()
+
+    def set_admin_url(self, admin_url: str) -> 'PulsarSourceBuilder':
+        """
+        Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+        """
+        self._j_pulsar_source_builder.setAdminUrl(admin_url)
+        return self
+
+    def set_service_url(self, service_url: str) -> 'PulsarSourceBuilder':
+        """
+        Sets the server's link for the PulsarConsumer of the PulsarSource.
+        """
+        self._j_pulsar_source_builder.setServiceUrl(service_url)
+        return self
+
+    def set_subscription_name(self, subscription_name: str) -> 'PulsarSourceBuilder':
+        """
+        Sets the name for this pulsar subscription.
+        """
+        self._j_pulsar_source_builder.setSubscriptionName(subscription_name)
+        return self
+
+    def set_subscription_type(self, subscription_type: SubscriptionType) -> 'PulsarSourceBuilder':
+        """
+        SubscriptionType is the consuming behavior for pulsar, we would generator different split
+        by the given subscription type. Please take some time to consider which subscription type
+        matches your application best. Default is SubscriptionType.Shared.
+        """
+        self._j_pulsar_source_builder.setSubscriptionType(
+            subscription_type._to_j_subscription_type())
+        return self
+
+    def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder':
+        """
+        Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this
+        non-existed topic wouldn't throw any exception. But the best solution is just consuming by
+        using a topic regex. You can set topics once either with setTopics or setTopicPattern in
+        this builder.
+        """
+        if not isinstance(topics, list):
+            topics = [topics]
+        self._j_pulsar_source_builder.setTopics(topics)
+        return self
+
+    def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder':

Review comment:
       Adds a test to cover this method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org