You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/08 07:52:44 UTC

[flink] 01/02: [FLINK-27966][python] Make the connector imports more explicit by adding connector type

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9dd2b19995beb766f0cd0986b079f2d210c1e836
Author: Dian Fu <di...@apache.org>
AuthorDate: Mon Aug 8 11:18:13 2022 +0800

    [FLINK-27966][python] Make the connector imports more explicit by adding connector type
---
 .../docs/dev/python/datastream/data_types.md       |   2 +-
 .../python/datastream/intro_to_datastream_api.md   |  11 +-
 .../docs/dev/python/datastream_tutorial.md         |   3 +-
 docs/content.zh/docs/dev/table/data_stream_api.md  |  12 +-
 .../docs/dev/python/datastream/data_types.md       |   2 +-
 .../python/datastream/intro_to_datastream_api.md   |  11 +-
 .../content/docs/dev/python/datastream_tutorial.md |   3 +-
 docs/content/docs/dev/table/data_stream_api.md     |  10 +-
 flink-python/pyflink/datastream/__init__.py        |  41 +++---
 .../pyflink/datastream/connectors/__init__.py      | 145 ++++++++-------------
 flink-python/pyflink/datastream/connectors/base.py |  28 ++--
 .../pyflink/datastream/connectors/cassandra.py     |  13 +-
 .../pyflink/datastream/connectors/file_system.py   |  15 ++-
 flink-python/pyflink/datastream/connectors/jdbc.py |   7 +
 .../pyflink/datastream/connectors/kafka.py         |   8 +-
 .../pyflink/datastream/connectors/kinesis.py       |  18 ++-
 .../pyflink/datastream/connectors/number_seq.py    |   6 +-
 .../pyflink/datastream/connectors/pulsar.py        |  15 +++
 .../pyflink/datastream/connectors/rabbitmq.py      |   7 +
 .../datastream/connectors/tests/test_connectors.py |  27 ++--
 flink-python/pyflink/datastream/formats/csv.py     |   4 +-
 .../tests/test_stream_execution_environment.py     |   2 +-
 .../datastream/connectors/kafka_avro_format.py     |   2 +-
 .../datastream/connectors/kafka_csv_format.py      |   2 +-
 .../datastream/connectors/kafka_json_format.py     |   2 +-
 .../examples/datastream/connectors/pulsar.py       |   6 +-
 .../windowing/session_with_dynamic_gap_window.py   |   2 +-
 .../windowing/session_with_gap_window.py           |   2 +-
 .../datastream/windowing/sliding_time_window.py    |   2 +-
 .../datastream/windowing/tumbling_count_window.py  |   2 +-
 .../datastream/windowing/tumbling_time_window.py   |   2 +-
 .../pyflink/examples/datastream/word_count.py      |   4 +-
 32 files changed, 224 insertions(+), 192 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/datastream/data_types.md b/docs/content.zh/docs/dev/python/datastream/data_types.md
index e095d164628..6afb610a526 100644
--- a/docs/content.zh/docs/dev/python/datastream/data_types.md
+++ b/docs/content.zh/docs/dev/python/datastream/data_types.md
@@ -66,7 +66,7 @@ For example, types need to be provided if you want to output data using the File
 from pyflink.common.serialization import Encoder
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink
+from pyflink.datastream.connectors.file_system import FileSink
 
 
 def file_sink():
diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
index 730560deb3e..5eefc0376b2 100644
--- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -43,7 +43,8 @@ from pyflink.common import WatermarkStrategy, Row
 from pyflink.common.serialization import Encoder
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
 from pyflink.datastream.functions import RuntimeContext, MapFunction
 from pyflink.datastream.state import ValueStateDescriptor
 
@@ -156,7 +157,7 @@ You can also create a `DataStream` using DataStream connectors with method `add_
 from pyflink.common.serialization import JsonRowDeserializationSchema
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
 
 env = StreamExecutionEnvironment.get_execution_environment()
 # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
@@ -186,7 +187,7 @@ source connectors:
 from pyflink.common.typeinfo import Types
 from pyflink.common.watermark_strategy import WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import NumberSequenceSource
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
 
 env = StreamExecutionEnvironment.get_execution_environment()
 seq_num_source = NumberSequenceSource(1, 1000)
@@ -303,7 +304,7 @@ You can call the `add_sink` method to emit the data of a `DataStream` to a DataS
 
 ```python
 from pyflink.common.typeinfo import Types
-from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
 from pyflink.common.serialization import JsonRowSerializationSchema
 
 serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
@@ -327,7 +328,7 @@ You could also call the `sink_to` method to emit the data of a `DataStream` to a
 sink connector:
 
 ```python
-from pyflink.datastream.connectors import FileSink, OutputFileConfig
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
 from pyflink.common.serialization import Encoder
 
 output_path = '/opt/output/'
diff --git a/docs/content.zh/docs/dev/python/datastream_tutorial.md b/docs/content.zh/docs/dev/python/datastream_tutorial.md
index d9f7543052e..51bd05b54a5 100644
--- a/docs/content.zh/docs/dev/python/datastream_tutorial.md
+++ b/docs/content.zh/docs/dev/python/datastream_tutorial.md
@@ -123,8 +123,7 @@ import sys
 
 from pyflink.common import WatermarkStrategy, Encoder, Types
 from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
-from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,
-                                           RollingPolicy)
+from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy
 
 
 word_count_data = ["To be, or not to be,--that is the question:--",
diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md
index 1e6adcdc11f..5aa882cdca6 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -2892,9 +2892,9 @@ env.execute()
 ```python
 from pyflink.common import Encoder
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink
+from pyflink.datastream.connectors.file_system import FileSink
 from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
-    
+
 env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create(env)
 
@@ -2928,10 +2928,10 @@ statement_set.add_insert(sink_descriptor, table_from_stream)
 statement_set.attach_as_datastream()
 
 # define other DataStream API parts
-env.from_collection([4, 5, 6])
-    .add_sink(FileSink
-              .for_row_format('/tmp/output', Encoder.simple_string_encoder())
-              .build())
+env.from_collection([4, 5, 6]) \
+   .add_sink(FileSink
+             .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+             .build())
 
 # use DataStream API to submit the pipelines
 env.execute()
diff --git a/docs/content/docs/dev/python/datastream/data_types.md b/docs/content/docs/dev/python/datastream/data_types.md
index ea94131b182..e8058507211 100644
--- a/docs/content/docs/dev/python/datastream/data_types.md
+++ b/docs/content/docs/dev/python/datastream/data_types.md
@@ -66,7 +66,7 @@ For example, types need to be provided if you want to output data using the File
 from pyflink.common.serialization import Encoder
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink
+from pyflink.datastream.connectors.file_system import FileSink
 
 
 def file_sink():
diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
index 7a5ea957d51..daedb0f418b 100644
--- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -43,7 +43,8 @@ from pyflink.common import WatermarkStrategy, Row
 from pyflink.common.serialization import Encoder
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
 from pyflink.datastream.functions import RuntimeContext, MapFunction
 from pyflink.datastream.state import ValueStateDescriptor
 
@@ -156,7 +157,7 @@ You can also create a `DataStream` using DataStream connectors with method `add_
 from pyflink.common.serialization import JsonRowDeserializationSchema
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
 
 env = StreamExecutionEnvironment.get_execution_environment()
 # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
@@ -186,7 +187,7 @@ source connectors:
 from pyflink.common.typeinfo import Types
 from pyflink.common.watermark_strategy import WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import NumberSequenceSource
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
 
 env = StreamExecutionEnvironment.get_execution_environment()
 seq_num_source = NumberSequenceSource(1, 1000)
@@ -303,7 +304,7 @@ You can call the `add_sink` method to emit the data of a `DataStream` to a DataS
 
 ```python
 from pyflink.common.typeinfo import Types
-from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
 from pyflink.common.serialization import JsonRowSerializationSchema
 
 serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
@@ -327,7 +328,7 @@ You could also call the `sink_to` method to emit the data of a `DataStream` to a
 sink connector:
 
 ```python
-from pyflink.datastream.connectors import FileSink, OutputFileConfig
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
 from pyflink.common.serialization import Encoder
 
 output_path = '/opt/output/'
diff --git a/docs/content/docs/dev/python/datastream_tutorial.md b/docs/content/docs/dev/python/datastream_tutorial.md
index 65462290477..b8187008d17 100644
--- a/docs/content/docs/dev/python/datastream_tutorial.md
+++ b/docs/content/docs/dev/python/datastream_tutorial.md
@@ -122,8 +122,7 @@ import sys
 
 from pyflink.common import WatermarkStrategy, Encoder, Types
 from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
-from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,
-                                           RollingPolicy)
+from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy
 
 
 word_count_data = ["To be, or not to be,--that is the question:--",
diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md
index 02d67a20931..ba0753c04bb 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -2890,7 +2890,7 @@ env.execute()
 ```python
 from pyflink.common import Encoder
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink
+from pyflink.datastream.connectors.file_system import FileSink
 from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
     
 env = StreamExecutionEnvironment.get_execution_environment()
@@ -2926,10 +2926,10 @@ statement_set.add_insert(sink_descriptor, table_from_stream)
 statement_set.attach_as_datastream()
 
 # define other DataStream API parts
-env.from_collection([4, 5, 6])
-    .add_sink(FileSink
-              .for_row_format('/tmp/output', Encoder.simple_string_encoder())
-              .build())
+env.from_collection([4, 5, 6]) \
+   .add_sink(FileSink
+             .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+             .build())
 
 # use DataStream API to submit the pipelines
 env.execute()
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 85035363b5f..32a3d092266 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -165,37 +165,48 @@ Classes to define source & sink:
 
     - :class:`connectors.elasticsearch.ElasticsearchSink`:
       A sink for publishing data into Elasticsearch 6 or Elasticsearch 7.
-    - :class:`connectors.FlinkKafkaConsumer`:
+    - :class:`connectors.kafka.FlinkKafkaConsumer`:
       A streaming data source that pulls a parallel data stream from Apache Kafka.
-    - :class:`connectors.FlinkKafkaProducer`:
+    - :class:`connectors.kafka.FlinkKafkaProducer`:
       A streaming data sink to produce data into a Kafka topic.
-    - :class:`connectors.KafkaSource`:
+    - :class:`connectors.kafka.KafkaSource`:
       The new API to read data in parallel from Apache Kafka.
-    - :class:`connectors.KafkaSink`:
+    - :class:`connectors.kafka.KafkaSink`:
       The new API to write data into to Apache Kafka topics.
-    - :class:`connectors.FileSource`:
+    - :class:`connectors.file_system.FileSource`:
       A unified data source that reads files - both in batch and in streaming mode.
       This source supports all (distributed) file systems and object stores that can be accessed via
       the Flink's FileSystem class.
-    - :class:`connectors.FileSink`:
+    - :class:`connectors.file_system.FileSink`:
       A unified sink that emits its input elements to FileSystem files within buckets. This
       sink achieves exactly-once semantics for both BATCH and STREAMING.
-    - :class:`connectors.NumberSequenceSource`:
+    - :class:`connectors.file_system.StreamingFileSink`:
+      Sink that emits its input elements to files within buckets. This is integrated with the
+      checkpointing mechanism to provide exactly once semantics.
+    - :class:`connectors.number_seq.NumberSequenceSource`:
       A data source that produces a sequence of numbers (longs). This source is useful for testing
       and for cases that just need a stream of N events of any kind.
-    - :class:`connectors.JdbcSink`:
+    - :class:`connectors.jdbc.JdbcSink`:
       A data sink to produce data into an external storage using JDBC.
-    - :class:`connectors.StreamingFileSink`:
-      Sink that emits its input elements to files within buckets. This is integrated with the
-      checkpointing mechanism to provide exactly once semantics.
-    - :class:`connectors.PulsarSource`:
+    - :class:`connectors.pulsar.PulsarSource`:
       A streaming data source that pulls a parallel data stream from Pulsar.
-    - :class:`connectors.PulsarSink`:
+    - :class:`connectors.pulsar.PulsarSink`:
       A streaming data sink to produce data into Pulsar.
-    - :class:`connectors.RMQSource`:
+    - :class:`connectors.rabbitmq.RMQSource`:
       A streaming data source that pulls a parallel data stream from RabbitMQ.
-    - :class:`connectors.RMQSink`:
+    - :class:`connectors.rabbitmq.RMQSink`:
       A Sink for publishing data into RabbitMQ.
+    - :class:`connectors.cassandra.CassandraSink`:
+      A Sink for publishing data into Cassandra.
+    - :class:`connectors.kinesis.FlinkKinesisConsumer`:
+      A streaming data source that pulls a parallel data stream from Kinesis.
+    - :class:`connectors.kinesis.KinesisStreamsSink`:
+      A Kinesis Data Streams (KDS) Sink that performs async requests against a destination stream
+      using the buffering protocol.
+    - :class:`connectors.kinesis.KinesisFirehoseSink`:
+      A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery
+      stream using the buffering protocol.
+
 
 Classes to define formats used together with source & sink:
 
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py
index c7581cbdf34..02681d299a8 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -16,99 +16,64 @@
 # limitations under the License.
 ################################################################################
 from pyflink.datastream.connectors.base import Sink, Source, DeliveryGuarantee
-from pyflink.datastream.connectors.elasticsearch import (Elasticsearch6SinkBuilder,
-                                                         Elasticsearch7SinkBuilder)
-from pyflink.datastream.connectors.file_system import (
-    BucketAssigner,
-    BulkFormat,
-    BulkWriterFactory,
-    FileEnumeratorProvider,
-    FileSink,
-    FileSplitAssignerProvider,
-    FileSource,
-    FileSourceBuilder,
-    OutputFileConfig,
-    RollingPolicy,
-    StreamFormat,
-    StreamingFileSink,
-)
-from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
-from pyflink.datastream.connectors.kafka import (
-    FlinkKafkaConsumer,
-    FlinkKafkaProducer,
-    Semantic,
-    KafkaSource,
-    KafkaSourceBuilder,
-    KafkaTopicPartition,
-    KafkaOffsetsInitializer,
-    KafkaOffsetResetStrategy,
-    KafkaSink,
-    KafkaSinkBuilder,
-    KafkaRecordSerializationSchema,
-    KafkaRecordSerializationSchemaBuilder,
-    KafkaTopicSelector,
-)
-from pyflink.datastream.connectors.number_seq import NumberSequenceSource
-from pyflink.datastream.connectors.pulsar import PulsarDeserializationSchema, PulsarSource, \
-    PulsarSourceBuilder, SubscriptionType, StartCursor, StopCursor, PulsarSerializationSchema, \
-    PulsarSink, PulsarSinkBuilder, MessageDelayer, TopicRoutingMode
-from pyflink.datastream.connectors.rabbitmq import RMQConnectionConfig, RMQSource, RMQSink
-from pyflink.datastream.connectors.kinesis import (FlinkKinesisConsumer, KinesisStreamsSink,
-                                                   KinesisFirehoseSink)
-from pyflink.datastream.connectors.cassandra import CassandraSink
 
 
 __all__ = [
     'Sink',
     'Source',
-    'DeliveryGuarantee',
-    'FileEnumeratorProvider',
-    'FileSink',
-    'FileSource',
-    'BucketAssigner',
-    'FileSourceBuilder',
-    'FileSplitAssignerProvider',
-    'FlinkKafkaConsumer',
-    'FlinkKafkaProducer',
-    'Semantic',
-    'KafkaSource',
-    'KafkaSourceBuilder',
-    'KafkaTopicPartition',
-    'KafkaOffsetsInitializer',
-    'KafkaOffsetResetStrategy',
-    'KafkaSink',
-    'KafkaSinkBuilder',
-    'KafkaRecordSerializationSchema',
-    'KafkaRecordSerializationSchemaBuilder',
-    'KafkaTopicSelector',
-    'JdbcSink',
-    'JdbcConnectionOptions',
-    'JdbcExecutionOptions',
-    'NumberSequenceSource',
-    'OutputFileConfig',
-    'PulsarDeserializationSchema',
-    'PulsarSource',
-    'PulsarSourceBuilder',
-    'SubscriptionType',
-    'PulsarSerializationSchema',
-    'PulsarSink',
-    'PulsarSinkBuilder',
-    'MessageDelayer',
-    'TopicRoutingMode',
-    'RMQConnectionConfig',
-    'RMQSource',
-    'RMQSink',
-    'RollingPolicy',
-    'StartCursor',
-    'StopCursor',
-    'BulkFormat',
-    'StreamFormat',
-    'BulkWriterFactory',
-    'StreamingFileSink',
-    'FlinkKinesisConsumer',
-    'KinesisStreamsSink',
-    'KinesisFirehoseSink',
-    'Elasticsearch6SinkBuilder',
-    'Elasticsearch7SinkBuilder',
-    'CassandraSink'
+    'DeliveryGuarantee'
 ]
+
+
+def _install():
+    from pyflink.datastream import connectors
+
+    # number_seq
+    from pyflink.datastream.connectors import number_seq
+    setattr(connectors, 'NumberSequenceSource', number_seq.NumberSequenceSource)
+
+    # jdbc
+    from pyflink.datastream.connectors import jdbc
+    setattr(connectors, 'JdbcSink', jdbc.JdbcSink)
+    setattr(connectors, 'JdbcConnectionOptions', jdbc.JdbcConnectionOptions)
+    setattr(connectors, 'JdbcExecutionOptions', jdbc.JdbcExecutionOptions)
+
+    # kafka
+    from pyflink.datastream.connectors import kafka
+    setattr(connectors, 'KafkaSource', kafka.KafkaSource)
+    setattr(connectors, 'FlinkKafkaConsumer', kafka.FlinkKafkaConsumer)
+    setattr(connectors, 'FlinkKafkaProducer', kafka.FlinkKafkaProducer)
+    setattr(connectors, 'Semantic', kafka.Semantic)
+
+    # pulsar
+    from pyflink.datastream.connectors import pulsar
+    setattr(connectors, 'PulsarSource', pulsar.PulsarSource)
+    setattr(connectors, 'PulsarSourceBuilder', pulsar.PulsarSourceBuilder)
+    setattr(connectors, 'PulsarDeserializationSchema', pulsar.PulsarDeserializationSchema)
+    setattr(connectors, 'SubscriptionType', pulsar.SubscriptionType)
+    setattr(connectors, 'StartCursor', pulsar.StartCursor)
+    setattr(connectors, 'StopCursor', pulsar.StopCursor)
+
+    # rabbitmq
+    from pyflink.datastream.connectors import rabbitmq
+    setattr(connectors, 'RMQSource', rabbitmq.RMQSource)
+    setattr(connectors, 'RMQSink', rabbitmq.RMQSink)
+    setattr(connectors, 'RMQConnectionConfig', rabbitmq.RMQConnectionConfig)
+
+    # filesystem
+    from pyflink.datastream.connectors import file_system
+    setattr(connectors, 'BucketAssigner', file_system.BucketAssigner)
+    setattr(connectors, 'FileEnumeratorProvider', file_system.FileEnumeratorProvider)
+    setattr(connectors, 'FileSink', file_system.FileSink)
+    setattr(connectors, 'FileSplitAssignerProvider', file_system.FileSplitAssignerProvider)
+    setattr(connectors, 'FileSource', file_system.FileSource)
+    setattr(connectors, 'FileSourceBuilder', file_system.FileSourceBuilder)
+    setattr(connectors, 'OutputFileConfig', file_system.OutputFileConfig)
+    setattr(connectors, 'RollingPolicy', file_system.RollingPolicy)
+    setattr(connectors, 'StreamFormat', file_system.StreamFormat)
+    setattr(connectors, 'StreamingFileSink', file_system.StreamingFileSink)
+
+
+# for backward compatibility
+_install()
+del _install
diff --git a/flink-python/pyflink/datastream/connectors/base.py b/flink-python/pyflink/datastream/connectors/base.py
index fa8732a2cc6..77d268f860f 100644
--- a/flink-python/pyflink/datastream/connectors/base.py
+++ b/flink-python/pyflink/datastream/connectors/base.py
@@ -53,20 +53,6 @@ class Sink(JavaFunctionWrapper):
         super(Sink, self).__init__(sink)
 
 
-class StreamTransformer(ABC):
-
-    @abstractmethod
-    def apply(self, ds):
-        pass
-
-
-class SupportsPreprocessing(ABC):
-
-    @abstractmethod
-    def get_transformer(self) -> Optional[StreamTransformer]:
-        pass
-
-
 class DeliveryGuarantee(Enum):
     """
     DeliverGuarantees that can be chosen. In general your pipeline can only offer the lowest
@@ -97,3 +83,17 @@ class DeliveryGuarantee(Enum):
         JDeliveryGuarantee = get_gateway().jvm \
             .org.apache.flink.connector.base.DeliveryGuarantee
         return getattr(JDeliveryGuarantee, self.name)
+
+
+class StreamTransformer(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class SupportsPreprocessing(ABC):
+
+    @abstractmethod
+    def get_transformer(self) -> Optional[StreamTransformer]:
+        pass
diff --git a/flink-python/pyflink/datastream/connectors/cassandra.py b/flink-python/pyflink/datastream/connectors/cassandra.py
index 4cca90753e5..5cd099d47d8 100644
--- a/flink-python/pyflink/datastream/connectors/cassandra.py
+++ b/flink-python/pyflink/datastream/connectors/cassandra.py
@@ -21,11 +21,14 @@ from pyflink.common import Duration
 from pyflink.java_gateway import get_gateway
 
 
-__all__ = ['ConsistencyLevel',
-           'MapperOptions',
-           'ClusterBuilder',
-           'CassandraCommitter',
-           'CassandraFailureHandler']
+__all__ = [
+    'CassandraSink',
+    'ConsistencyLevel',
+    'MapperOptions',
+    'ClusterBuilder',
+    'CassandraCommitter',
+    'CassandraFailureHandler'
+]
 
 # ---- Classes introduced to construct the MapperOptions ----
 
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py b/flink-python/pyflink/datastream/connectors/file_system.py
index 6a8816457bf..c8236333921 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -33,7 +33,20 @@ from pyflink.util.java_utils import to_jarray
 
 __all__ = [
     'FileCompactor',
-    'FileCompactStrategy'
+    'FileCompactStrategy',
+    'OutputFileConfig',
+    'FileSource',
+    'FileSourceBuilder',
+    'FileSink',
+    'StreamingFileSink',
+    'BulkFormat',
+    'StreamFormat',
+    'InputFormat',
+    'BulkWriterFactory',
+    'FileEnumeratorProvider',
+    'FileSplitAssignerProvider',
+    'RollingPolicy',
+    'BucketAssigner'
 ]
 
 
diff --git a/flink-python/pyflink/datastream/connectors/jdbc.py b/flink-python/pyflink/datastream/connectors/jdbc.py
index a7f1c985461..5270a0be918 100644
--- a/flink-python/pyflink/datastream/connectors/jdbc.py
+++ b/flink-python/pyflink/datastream/connectors/jdbc.py
@@ -21,6 +21,13 @@ from pyflink.java_gateway import get_gateway
 from pyflink.util.java_utils import to_jarray
 
 
+__all__ = [
+    'JdbcSink',
+    'JdbcConnectionOptions',
+    'JdbcExecutionOptions'
+]
+
+
 class JdbcSink(SinkFunction):
 
     def __init__(self, j_jdbc_sink):
diff --git a/flink-python/pyflink/datastream/connectors/kafka.py b/flink-python/pyflink/datastream/connectors/kafka.py
index 707dd679046..e7d98f079d2 100644
--- a/flink-python/pyflink/datastream/connectors/kafka.py
+++ b/flink-python/pyflink/datastream/connectors/kafka.py
@@ -34,17 +34,17 @@ from pyflink.util.java_utils import to_jarray, get_field, get_field_value
 __all__ = [
     'FlinkKafkaConsumer',
     'FlinkKafkaProducer',
-    'Semantic',
     'KafkaSource',
     'KafkaSourceBuilder',
+    'KafkaSink',
+    'KafkaSinkBuilder',
+    'Semantic',
     'KafkaTopicPartition',
     'KafkaOffsetsInitializer',
     'KafkaOffsetResetStrategy',
-    'KafkaSink',
-    'KafkaSinkBuilder',
     'KafkaRecordSerializationSchema',
     'KafkaRecordSerializationSchemaBuilder',
-    'KafkaTopicSelector',
+    'KafkaTopicSelector'
 ]
 
 
diff --git a/flink-python/pyflink/datastream/connectors/kinesis.py b/flink-python/pyflink/datastream/connectors/kinesis.py
index 06d18e7ed8c..ebf4666f2d1 100644
--- a/flink-python/pyflink/datastream/connectors/kinesis.py
+++ b/flink-python/pyflink/datastream/connectors/kinesis.py
@@ -23,13 +23,17 @@ from pyflink.datastream.functions import SourceFunction
 from pyflink.datastream.connectors import Sink
 from pyflink.java_gateway import get_gateway
 
-__all__ = ['KinesisShardAssigner',
-           'KinesisDeserializationSchema',
-           'WatermarkTracker',
-           'PartitionKeyGenerator',
-           'FlinkKinesisConsumer',
-           'KinesisStreamsSink',
-           'KinesisFirehoseSink']
+__all__ = [
+    'KinesisShardAssigner',
+    'KinesisDeserializationSchema',
+    'WatermarkTracker',
+    'PartitionKeyGenerator',
+    'FlinkKinesisConsumer',
+    'KinesisStreamsSink',
+    'KinesisStreamsSinkBuilder',
+    'KinesisFirehoseSink',
+    'KinesisFirehoseSinkBuilder'
+]
 
 
 # ---- KinesisSource ----
diff --git a/flink-python/pyflink/datastream/connectors/number_seq.py b/flink-python/pyflink/datastream/connectors/number_seq.py
index fca9a28703f..ee5cba987b3 100644
--- a/flink-python/pyflink/datastream/connectors/number_seq.py
+++ b/flink-python/pyflink/datastream/connectors/number_seq.py
@@ -18,6 +18,10 @@
 from pyflink.datastream.connectors import Source
 from pyflink.java_gateway import get_gateway
 
+__all__ = [
+    'NumberSequenceSource'
+]
+
 
 class NumberSequenceSource(Source):
     """
@@ -33,7 +37,7 @@ class NumberSequenceSource(Source):
     because, despite the fact that the produced stream is bounded, the end bound is pretty far away.
     """
 
-    def __init__(self, start, end):
+    def __init__(self, start: int, end: int):
         """
         Creates a new NumberSequenceSource that produces parallel sequences covering the
         range start to end (both boundaries are inclusive).
diff --git a/flink-python/pyflink/datastream/connectors/pulsar.py b/flink-python/pyflink/datastream/connectors/pulsar.py
index 79efca87f04..c9ad53dfd62 100644
--- a/flink-python/pyflink/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/datastream/connectors/pulsar.py
@@ -26,6 +26,21 @@ from pyflink.java_gateway import get_gateway
 from pyflink.util.java_utils import load_java_class
 
 
+__all__ = [
+    'PulsarSource',
+    'PulsarSourceBuilder',
+    'PulsarDeserializationSchema',
+    'SubscriptionType',
+    'StartCursor',
+    'StopCursor',
+    'PulsarSink',
+    'PulsarSinkBuilder',
+    'PulsarSerializationSchema',
+    'MessageDelayer',
+    'TopicRoutingMode'
+]
+
+
 # ---- PulsarSource ----
 
 
diff --git a/flink-python/pyflink/datastream/connectors/rabbitmq.py b/flink-python/pyflink/datastream/connectors/rabbitmq.py
index e97eb42a073..b0f6ed45e7f 100644
--- a/flink-python/pyflink/datastream/connectors/rabbitmq.py
+++ b/flink-python/pyflink/datastream/connectors/rabbitmq.py
@@ -20,6 +20,13 @@ from pyflink.datastream.functions import SinkFunction, SourceFunction
 from pyflink.java_gateway import get_gateway
 
 
+__all__ = [
+    'RMQConnectionConfig',
+    'RMQSource',
+    'RMQSink'
+]
+
+
 class RMQConnectionConfig(object):
     """
     Connection Configuration for RMQ.
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_connectors.py b/flink-python/pyflink/datastream/connectors/tests/test_connectors.py
index d6631fe3a02..1d12083cd3e 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_connectors.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_connectors.py
@@ -16,23 +16,26 @@
 # limitations under the License.
 ################################################################################
 
-from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, \
-    FlushBackoffType, ElasticsearchEmitter
-
 from pyflink.common import typeinfo, Duration, WatermarkStrategy, ConfigOptions
 from pyflink.common.serialization import JsonRowDeserializationSchema, \
     JsonRowSerializationSchema, Encoder, SimpleStringSchema
 from pyflink.common.typeinfo import Types
-from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer, JdbcSink, \
-    JdbcConnectionOptions, JdbcExecutionOptions, StreamingFileSink, \
-    OutputFileConfig, FileSource, StreamFormat, FileEnumeratorProvider, FileSplitAssignerProvider, \
-    NumberSequenceSource, RollingPolicy, FileSink, BucketAssigner, RMQSink, RMQSource, \
-    RMQConnectionConfig, PulsarSource, StartCursor, PulsarDeserializationSchema, StopCursor, \
-    SubscriptionType, PulsarSink, PulsarSerializationSchema, DeliveryGuarantee, TopicRoutingMode, \
-    MessageDelayer, FlinkKinesisConsumer, KinesisStreamsSink, KinesisFirehoseSink
+from pyflink.datastream.connectors import DeliveryGuarantee
 from pyflink.datastream.connectors.cassandra import CassandraSink, MapperOptions, ConsistencyLevel
-from pyflink.datastream.connectors.file_system import FileCompactStrategy, FileCompactor
-from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, \
+    FlushBackoffType, ElasticsearchEmitter
+from pyflink.datastream.connectors.file_system import FileCompactStrategy, FileCompactor, \
+    StreamingFileSink, OutputFileConfig, FileSource, StreamFormat, FileEnumeratorProvider, \
+    FileSplitAssignerProvider, RollingPolicy, FileSink, BucketAssigner
+from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
+from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer
+from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator, FlinkKinesisConsumer, \
+    KinesisStreamsSink, KinesisFirehoseSink
+from pyflink.datastream.connectors.pulsar import PulsarSerializationSchema, TopicRoutingMode, \
+    MessageDelayer, PulsarSink, PulsarSource, StartCursor, PulsarDeserializationSchema, \
+    StopCursor, SubscriptionType
+from pyflink.datastream.connectors.rabbitmq import RMQSink, RMQSource, RMQConnectionConfig
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import invoke_java_object_method, PyFlinkStreamingTestCase
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
index e5d18dd2309..d845bde6449 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -18,8 +18,8 @@
 from typing import Optional
 
 from pyflink.common.typeinfo import _from_java_type
-from pyflink.datastream.connectors import StreamFormat
-from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory
+from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory, \
+    StreamFormat
 from pyflink.java_gateway import get_gateway
 from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
 
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 06ce00eb1c8..c80ab5f7d9b 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -32,7 +32,7 @@ from pyflink.common.typeinfo import Types
 from pyflink.datastream import (StreamExecutionEnvironment, CheckpointConfig,
                                 CheckpointingMode, MemoryStateBackend, TimeCharacteristic,
                                 SlotSharingGroup)
-from pyflink.datastream.connectors import FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
 from pyflink.datastream.execution_mode import RuntimeExecutionMode
 from pyflink.datastream.functions import SourceFunction
 from pyflink.datastream.slot_sharing_group import MemorySize
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
index 9e3b2658d89..dd3711aff90 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
@@ -20,7 +20,7 @@ import sys
 
 from pyflink.common import AvroRowSerializationSchema, Types, AvroRowDeserializationSchema
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
 
 
 # Make sure that the Kafka cluster is started and the topic 'test_avro_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
index 21aa3ab17a6..99d5d62e90c 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
@@ -20,7 +20,7 @@ import sys
 
 from pyflink.common import Types, JsonRowDeserializationSchema, CsvRowSerializationSchema
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
 
 
 # Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
index 10bf85394a2..cf650fdb731 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
@@ -20,7 +20,7 @@ import sys
 
 from pyflink.common import Types, JsonRowDeserializationSchema, JsonRowSerializationSchema
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
 
 
 # Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/pulsar.py b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
index 2278d522dbf..8e0d60879f7 100644
--- a/flink-python/pyflink/examples/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
@@ -21,9 +21,9 @@ import sys
 
 from pyflink.common import SimpleStringSchema, WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import PulsarSource, PulsarSink, PulsarSerializationSchema, \
-    StartCursor, StopCursor, SubscriptionType, PulsarDeserializationSchema, DeliveryGuarantee, \
-    TopicRoutingMode
+from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink,\
+    PulsarSerializationSchema, StartCursor, StopCursor, SubscriptionType, \
+    PulsarDeserializationSchema, DeliveryGuarantee, TopicRoutingMode
 
 if __name__ == '__main__':
     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
diff --git a/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
index ebfce5212d0..82cf8a991a2 100644
--- a/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
@@ -20,7 +20,7 @@ import sys
 import argparse
 from typing import Iterable
 
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
 
 from pyflink.common import Types, WatermarkStrategy, Encoder
 from pyflink.common.watermark_strategy import TimestampAssigner
diff --git a/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
index 2cca566264d..39ba8093abe 100644
--- a/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
@@ -20,7 +20,7 @@ import sys
 import argparse
 from typing import Iterable
 
-from pyflink.datastream.connectors import FileSink, RollingPolicy, OutputFileConfig
+from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy, OutputFileConfig
 
 from pyflink.common import Types, WatermarkStrategy, Time, Encoder
 from pyflink.common.watermark_strategy import TimestampAssigner
diff --git a/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
index 3f68d8439d9..287a431614f 100644
--- a/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
@@ -20,7 +20,7 @@ import sys
 import argparse
 from typing import Iterable
 
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
 
 from pyflink.common import Types, WatermarkStrategy, Time, Encoder
 from pyflink.common.watermark_strategy import TimestampAssigner
diff --git a/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
index 084757608e0..580dd84ec6f 100644
--- a/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
@@ -20,7 +20,7 @@ import sys
 import argparse
 from typing import Iterable
 
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
 
 from pyflink.common import Types, Encoder
 from pyflink.datastream import StreamExecutionEnvironment, WindowFunction
diff --git a/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
index 0576fe6dc90..3fbd150fcc2 100644
--- a/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
@@ -20,7 +20,7 @@ import sys
 import argparse
 from typing import Iterable
 
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
 
 from pyflink.common import Types, WatermarkStrategy, Time, Encoder
 from pyflink.common.watermark_strategy import TimestampAssigner
diff --git a/flink-python/pyflink/examples/datastream/word_count.py b/flink-python/pyflink/examples/datastream/word_count.py
index ac5ac08bbc4..e827c5b53ca 100644
--- a/flink-python/pyflink/examples/datastream/word_count.py
+++ b/flink-python/pyflink/examples/datastream/word_count.py
@@ -21,8 +21,8 @@ import sys
 
 from pyflink.common import WatermarkStrategy, Encoder, Types
 from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
-from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,
-                                           RollingPolicy)
+from pyflink.datastream.connectors.file_system import (FileSource, StreamFormat, FileSink,
+                                                       OutputFileConfig, RollingPolicy)
 
 
 word_count_data = ["To be, or not to be,--that is the question:--",