You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/08 07:52:44 UTC
[flink] 01/02: [FLINK-27966][python] Make the connector imports more explicit by adding connector type
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9dd2b19995beb766f0cd0986b079f2d210c1e836
Author: Dian Fu <di...@apache.org>
AuthorDate: Mon Aug 8 11:18:13 2022 +0800
[FLINK-27966][python] Make the connector imports more explicit by adding connector type
---
.../docs/dev/python/datastream/data_types.md | 2 +-
.../python/datastream/intro_to_datastream_api.md | 11 +-
.../docs/dev/python/datastream_tutorial.md | 3 +-
docs/content.zh/docs/dev/table/data_stream_api.md | 12 +-
.../docs/dev/python/datastream/data_types.md | 2 +-
.../python/datastream/intro_to_datastream_api.md | 11 +-
.../content/docs/dev/python/datastream_tutorial.md | 3 +-
docs/content/docs/dev/table/data_stream_api.md | 10 +-
flink-python/pyflink/datastream/__init__.py | 41 +++---
.../pyflink/datastream/connectors/__init__.py | 145 ++++++++-------------
flink-python/pyflink/datastream/connectors/base.py | 28 ++--
.../pyflink/datastream/connectors/cassandra.py | 13 +-
.../pyflink/datastream/connectors/file_system.py | 15 ++-
flink-python/pyflink/datastream/connectors/jdbc.py | 7 +
.../pyflink/datastream/connectors/kafka.py | 8 +-
.../pyflink/datastream/connectors/kinesis.py | 18 ++-
.../pyflink/datastream/connectors/number_seq.py | 6 +-
.../pyflink/datastream/connectors/pulsar.py | 15 +++
.../pyflink/datastream/connectors/rabbitmq.py | 7 +
.../datastream/connectors/tests/test_connectors.py | 27 ++--
flink-python/pyflink/datastream/formats/csv.py | 4 +-
.../tests/test_stream_execution_environment.py | 2 +-
.../datastream/connectors/kafka_avro_format.py | 2 +-
.../datastream/connectors/kafka_csv_format.py | 2 +-
.../datastream/connectors/kafka_json_format.py | 2 +-
.../examples/datastream/connectors/pulsar.py | 6 +-
.../windowing/session_with_dynamic_gap_window.py | 2 +-
.../windowing/session_with_gap_window.py | 2 +-
.../datastream/windowing/sliding_time_window.py | 2 +-
.../datastream/windowing/tumbling_count_window.py | 2 +-
.../datastream/windowing/tumbling_time_window.py | 2 +-
.../pyflink/examples/datastream/word_count.py | 4 +-
32 files changed, 224 insertions(+), 192 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/datastream/data_types.md b/docs/content.zh/docs/dev/python/datastream/data_types.md
index e095d164628..6afb610a526 100644
--- a/docs/content.zh/docs/dev/python/datastream/data_types.md
+++ b/docs/content.zh/docs/dev/python/datastream/data_types.md
@@ -66,7 +66,7 @@ For example, types need to be provided if you want to output data using the File
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink
+from pyflink.datastream.connectors.file_system import FileSink
def file_sink():
diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
index 730560deb3e..5eefc0376b2 100644
--- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -43,7 +43,8 @@ from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
@@ -156,7 +157,7 @@ You can also create a `DataStream` using DataStream connectors with method `add_
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
@@ -186,7 +187,7 @@ source connectors:
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import NumberSequenceSource
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
@@ -303,7 +304,7 @@ You can call the `add_sink` method to emit the data of a `DataStream` to a DataS
```python
from pyflink.common.typeinfo import Types
-from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
from pyflink.common.serialization import JsonRowSerializationSchema
serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
@@ -327,7 +328,7 @@ You could also call the `sink_to` method to emit the data of a `DataStream` to a
sink connector:
```python
-from pyflink.datastream.connectors import FileSink, OutputFileConfig
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
from pyflink.common.serialization import Encoder
output_path = '/opt/output/'
diff --git a/docs/content.zh/docs/dev/python/datastream_tutorial.md b/docs/content.zh/docs/dev/python/datastream_tutorial.md
index d9f7543052e..51bd05b54a5 100644
--- a/docs/content.zh/docs/dev/python/datastream_tutorial.md
+++ b/docs/content.zh/docs/dev/python/datastream_tutorial.md
@@ -123,8 +123,7 @@ import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
-from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,
- RollingPolicy)
+from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy
word_count_data = ["To be, or not to be,--that is the question:--",
diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md
index 1e6adcdc11f..5aa882cdca6 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -2892,9 +2892,9 @@ env.execute()
```python
from pyflink.common import Encoder
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink
+from pyflink.datastream.connectors.file_system import FileSink
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
-
+
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
@@ -2928,10 +2928,10 @@ statement_set.add_insert(sink_descriptor, table_from_stream)
statement_set.attach_as_datastream()
# define other DataStream API parts
-env.from_collection([4, 5, 6])
- .add_sink(FileSink
- .for_row_format('/tmp/output', Encoder.simple_string_encoder())
- .build())
+env.from_collection([4, 5, 6]) \
+ .add_sink(FileSink
+ .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+ .build())
# use DataStream API to submit the pipelines
env.execute()
diff --git a/docs/content/docs/dev/python/datastream/data_types.md b/docs/content/docs/dev/python/datastream/data_types.md
index ea94131b182..e8058507211 100644
--- a/docs/content/docs/dev/python/datastream/data_types.md
+++ b/docs/content/docs/dev/python/datastream/data_types.md
@@ -66,7 +66,7 @@ For example, types need to be provided if you want to output data using the File
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink
+from pyflink.datastream.connectors.file_system import FileSink
def file_sink():
diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
index 7a5ea957d51..daedb0f418b 100644
--- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -43,7 +43,8 @@ from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
@@ -156,7 +157,7 @@ You can also create a `DataStream` using DataStream connectors with method `add_
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
@@ -186,7 +187,7 @@ source connectors:
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import NumberSequenceSource
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
@@ -303,7 +304,7 @@ You can call the `add_sink` method to emit the data of a `DataStream` to a DataS
```python
from pyflink.common.typeinfo import Types
-from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
from pyflink.common.serialization import JsonRowSerializationSchema
serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
@@ -327,7 +328,7 @@ You could also call the `sink_to` method to emit the data of a `DataStream` to a
sink connector:
```python
-from pyflink.datastream.connectors import FileSink, OutputFileConfig
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
from pyflink.common.serialization import Encoder
output_path = '/opt/output/'
diff --git a/docs/content/docs/dev/python/datastream_tutorial.md b/docs/content/docs/dev/python/datastream_tutorial.md
index 65462290477..b8187008d17 100644
--- a/docs/content/docs/dev/python/datastream_tutorial.md
+++ b/docs/content/docs/dev/python/datastream_tutorial.md
@@ -122,8 +122,7 @@ import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
-from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,
- RollingPolicy)
+from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy
word_count_data = ["To be, or not to be,--that is the question:--",
diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md
index 02d67a20931..ba0753c04bb 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -2890,7 +2890,7 @@ env.execute()
```python
from pyflink.common import Encoder
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FileSink
+from pyflink.datastream.connectors.file_system import FileSink
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
env = StreamExecutionEnvironment.get_execution_environment()
@@ -2926,10 +2926,10 @@ statement_set.add_insert(sink_descriptor, table_from_stream)
statement_set.attach_as_datastream()
# define other DataStream API parts
-env.from_collection([4, 5, 6])
- .add_sink(FileSink
- .for_row_format('/tmp/output', Encoder.simple_string_encoder())
- .build())
+env.from_collection([4, 5, 6]) \
+ .add_sink(FileSink
+ .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+ .build())
# use DataStream API to submit the pipelines
env.execute()
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 85035363b5f..32a3d092266 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -165,37 +165,48 @@ Classes to define source & sink:
- :class:`connectors.elasticsearch.ElasticsearchSink`:
A sink for publishing data into Elasticsearch 6 or Elasticsearch 7.
- - :class:`connectors.FlinkKafkaConsumer`:
+ - :class:`connectors.kafka.FlinkKafkaConsumer`:
A streaming data source that pulls a parallel data stream from Apache Kafka.
- - :class:`connectors.FlinkKafkaProducer`:
+ - :class:`connectors.kafka.FlinkKafkaProducer`:
A streaming data sink to produce data into a Kafka topic.
- - :class:`connectors.KafkaSource`:
+ - :class:`connectors.kafka.KafkaSource`:
The new API to read data in parallel from Apache Kafka.
- - :class:`connectors.KafkaSink`:
+ - :class:`connectors.kafka.KafkaSink`:
The new API to write data into to Apache Kafka topics.
- - :class:`connectors.FileSource`:
+ - :class:`connectors.file_system.FileSource`:
A unified data source that reads files - both in batch and in streaming mode.
This source supports all (distributed) file systems and object stores that can be accessed via
the Flink's FileSystem class.
- - :class:`connectors.FileSink`:
+ - :class:`connectors.file_system.FileSink`:
A unified sink that emits its input elements to FileSystem files within buckets. This
sink achieves exactly-once semantics for both BATCH and STREAMING.
- - :class:`connectors.NumberSequenceSource`:
+ - :class:`connectors.file_system.StreamingFileSink`:
+ Sink that emits its input elements to files within buckets. This is integrated with the
+ checkpointing mechanism to provide exactly once semantics.
+ - :class:`connectors.number_seq.NumberSequenceSource`:
A data source that produces a sequence of numbers (longs). This source is useful for testing
and for cases that just need a stream of N events of any kind.
- - :class:`connectors.JdbcSink`:
+ - :class:`connectors.jdbc.JdbcSink`:
A data sink to produce data into an external storage using JDBC.
- - :class:`connectors.StreamingFileSink`:
- Sink that emits its input elements to files within buckets. This is integrated with the
- checkpointing mechanism to provide exactly once semantics.
- - :class:`connectors.PulsarSource`:
+ - :class:`connectors.pulsar.PulsarSource`:
A streaming data source that pulls a parallel data stream from Pulsar.
- - :class:`connectors.PulsarSink`:
+ - :class:`connectors.pulsar.PulsarSink`:
A streaming data sink to produce data into Pulsar.
- - :class:`connectors.RMQSource`:
+ - :class:`connectors.rabbitmq.RMQSource`:
A streaming data source that pulls a parallel data stream from RabbitMQ.
- - :class:`connectors.RMQSink`:
+ - :class:`connectors.rabbitmq.RMQSink`:
A Sink for publishing data into RabbitMQ.
+ - :class:`connectors.cassandra.CassandraSink`:
+ A Sink for publishing data into Cassandra.
+ - :class:`connectors.kinesis.FlinkKinesisConsumer`:
+ A streaming data source that pulls a parallel data stream from Kinesis.
+ - :class:`connectors.kinesis.KinesisStreamsSink`:
+ A Kinesis Data Streams (KDS) Sink that performs async requests against a destination stream
+ using the buffering protocol.
+ - :class:`connectors.kinesis.KinesisFirehoseSink`:
+ A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery
+ stream using the buffering protocol.
+
Classes to define formats used together with source & sink:
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py
index c7581cbdf34..02681d299a8 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -16,99 +16,64 @@
# limitations under the License.
################################################################################
from pyflink.datastream.connectors.base import Sink, Source, DeliveryGuarantee
-from pyflink.datastream.connectors.elasticsearch import (Elasticsearch6SinkBuilder,
- Elasticsearch7SinkBuilder)
-from pyflink.datastream.connectors.file_system import (
- BucketAssigner,
- BulkFormat,
- BulkWriterFactory,
- FileEnumeratorProvider,
- FileSink,
- FileSplitAssignerProvider,
- FileSource,
- FileSourceBuilder,
- OutputFileConfig,
- RollingPolicy,
- StreamFormat,
- StreamingFileSink,
-)
-from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
-from pyflink.datastream.connectors.kafka import (
- FlinkKafkaConsumer,
- FlinkKafkaProducer,
- Semantic,
- KafkaSource,
- KafkaSourceBuilder,
- KafkaTopicPartition,
- KafkaOffsetsInitializer,
- KafkaOffsetResetStrategy,
- KafkaSink,
- KafkaSinkBuilder,
- KafkaRecordSerializationSchema,
- KafkaRecordSerializationSchemaBuilder,
- KafkaTopicSelector,
-)
-from pyflink.datastream.connectors.number_seq import NumberSequenceSource
-from pyflink.datastream.connectors.pulsar import PulsarDeserializationSchema, PulsarSource, \
- PulsarSourceBuilder, SubscriptionType, StartCursor, StopCursor, PulsarSerializationSchema, \
- PulsarSink, PulsarSinkBuilder, MessageDelayer, TopicRoutingMode
-from pyflink.datastream.connectors.rabbitmq import RMQConnectionConfig, RMQSource, RMQSink
-from pyflink.datastream.connectors.kinesis import (FlinkKinesisConsumer, KinesisStreamsSink,
- KinesisFirehoseSink)
-from pyflink.datastream.connectors.cassandra import CassandraSink
__all__ = [
'Sink',
'Source',
- 'DeliveryGuarantee',
- 'FileEnumeratorProvider',
- 'FileSink',
- 'FileSource',
- 'BucketAssigner',
- 'FileSourceBuilder',
- 'FileSplitAssignerProvider',
- 'FlinkKafkaConsumer',
- 'FlinkKafkaProducer',
- 'Semantic',
- 'KafkaSource',
- 'KafkaSourceBuilder',
- 'KafkaTopicPartition',
- 'KafkaOffsetsInitializer',
- 'KafkaOffsetResetStrategy',
- 'KafkaSink',
- 'KafkaSinkBuilder',
- 'KafkaRecordSerializationSchema',
- 'KafkaRecordSerializationSchemaBuilder',
- 'KafkaTopicSelector',
- 'JdbcSink',
- 'JdbcConnectionOptions',
- 'JdbcExecutionOptions',
- 'NumberSequenceSource',
- 'OutputFileConfig',
- 'PulsarDeserializationSchema',
- 'PulsarSource',
- 'PulsarSourceBuilder',
- 'SubscriptionType',
- 'PulsarSerializationSchema',
- 'PulsarSink',
- 'PulsarSinkBuilder',
- 'MessageDelayer',
- 'TopicRoutingMode',
- 'RMQConnectionConfig',
- 'RMQSource',
- 'RMQSink',
- 'RollingPolicy',
- 'StartCursor',
- 'StopCursor',
- 'BulkFormat',
- 'StreamFormat',
- 'BulkWriterFactory',
- 'StreamingFileSink',
- 'FlinkKinesisConsumer',
- 'KinesisStreamsSink',
- 'KinesisFirehoseSink',
- 'Elasticsearch6SinkBuilder',
- 'Elasticsearch7SinkBuilder',
- 'CassandraSink'
+ 'DeliveryGuarantee'
]
+
+
+def _install():
+ from pyflink.datastream import connectors
+
+ # number_seq
+ from pyflink.datastream.connectors import number_seq
+ setattr(connectors, 'NumberSequenceSource', number_seq.NumberSequenceSource)
+
+ # jdbc
+ from pyflink.datastream.connectors import jdbc
+ setattr(connectors, 'JdbcSink', jdbc.JdbcSink)
+ setattr(connectors, 'JdbcConnectionOptions', jdbc.JdbcConnectionOptions)
+ setattr(connectors, 'JdbcExecutionOptions', jdbc.JdbcExecutionOptions)
+
+ # kafka
+ from pyflink.datastream.connectors import kafka
+ setattr(connectors, 'KafkaSource', kafka.KafkaSource)
+ setattr(connectors, 'FlinkKafkaConsumer', kafka.FlinkKafkaConsumer)
+ setattr(connectors, 'FlinkKafkaProducer', kafka.FlinkKafkaProducer)
+ setattr(connectors, 'Semantic', kafka.Semantic)
+
+ # pulsar
+ from pyflink.datastream.connectors import pulsar
+ setattr(connectors, 'PulsarSource', pulsar.PulsarSource)
+ setattr(connectors, 'PulsarSourceBuilder', pulsar.PulsarSourceBuilder)
+ setattr(connectors, 'PulsarDeserializationSchema', pulsar.PulsarDeserializationSchema)
+ setattr(connectors, 'SubscriptionType', pulsar.SubscriptionType)
+ setattr(connectors, 'StartCursor', pulsar.StartCursor)
+ setattr(connectors, 'StopCursor', pulsar.StopCursor)
+
+ # rabbitmq
+ from pyflink.datastream.connectors import rabbitmq
+ setattr(connectors, 'RMQSource', rabbitmq.RMQSource)
+ setattr(connectors, 'RMQSink', rabbitmq.RMQSink)
+ setattr(connectors, 'RMQConnectionConfig', rabbitmq.RMQConnectionConfig)
+
+ # filesystem
+ from pyflink.datastream.connectors import file_system
+ setattr(connectors, 'BucketAssigner', file_system.BucketAssigner)
+ setattr(connectors, 'FileEnumeratorProvider', file_system.FileEnumeratorProvider)
+ setattr(connectors, 'FileSink', file_system.FileSink)
+ setattr(connectors, 'FileSplitAssignerProvider', file_system.FileSplitAssignerProvider)
+ setattr(connectors, 'FileSource', file_system.FileSource)
+ setattr(connectors, 'FileSourceBuilder', file_system.FileSourceBuilder)
+ setattr(connectors, 'OutputFileConfig', file_system.OutputFileConfig)
+ setattr(connectors, 'RollingPolicy', file_system.RollingPolicy)
+ setattr(connectors, 'StreamFormat', file_system.StreamFormat)
+ setattr(connectors, 'StreamingFileSink', file_system.StreamingFileSink)
+
+
+# for backward compatibility
+_install()
+del _install
diff --git a/flink-python/pyflink/datastream/connectors/base.py b/flink-python/pyflink/datastream/connectors/base.py
index fa8732a2cc6..77d268f860f 100644
--- a/flink-python/pyflink/datastream/connectors/base.py
+++ b/flink-python/pyflink/datastream/connectors/base.py
@@ -53,20 +53,6 @@ class Sink(JavaFunctionWrapper):
super(Sink, self).__init__(sink)
-class StreamTransformer(ABC):
-
- @abstractmethod
- def apply(self, ds):
- pass
-
-
-class SupportsPreprocessing(ABC):
-
- @abstractmethod
- def get_transformer(self) -> Optional[StreamTransformer]:
- pass
-
-
class DeliveryGuarantee(Enum):
"""
DeliverGuarantees that can be chosen. In general your pipeline can only offer the lowest
@@ -97,3 +83,17 @@ class DeliveryGuarantee(Enum):
JDeliveryGuarantee = get_gateway().jvm \
.org.apache.flink.connector.base.DeliveryGuarantee
return getattr(JDeliveryGuarantee, self.name)
+
+
+class StreamTransformer(ABC):
+
+ @abstractmethod
+ def apply(self, ds):
+ pass
+
+
+class SupportsPreprocessing(ABC):
+
+ @abstractmethod
+ def get_transformer(self) -> Optional[StreamTransformer]:
+ pass
diff --git a/flink-python/pyflink/datastream/connectors/cassandra.py b/flink-python/pyflink/datastream/connectors/cassandra.py
index 4cca90753e5..5cd099d47d8 100644
--- a/flink-python/pyflink/datastream/connectors/cassandra.py
+++ b/flink-python/pyflink/datastream/connectors/cassandra.py
@@ -21,11 +21,14 @@ from pyflink.common import Duration
from pyflink.java_gateway import get_gateway
-__all__ = ['ConsistencyLevel',
- 'MapperOptions',
- 'ClusterBuilder',
- 'CassandraCommitter',
- 'CassandraFailureHandler']
+__all__ = [
+ 'CassandraSink',
+ 'ConsistencyLevel',
+ 'MapperOptions',
+ 'ClusterBuilder',
+ 'CassandraCommitter',
+ 'CassandraFailureHandler'
+]
# ---- Classes introduced to construct the MapperOptions ----
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py b/flink-python/pyflink/datastream/connectors/file_system.py
index 6a8816457bf..c8236333921 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -33,7 +33,20 @@ from pyflink.util.java_utils import to_jarray
__all__ = [
'FileCompactor',
- 'FileCompactStrategy'
+ 'FileCompactStrategy',
+ 'OutputFileConfig',
+ 'FileSource',
+ 'FileSourceBuilder',
+ 'FileSink',
+ 'StreamingFileSink',
+ 'BulkFormat',
+ 'StreamFormat',
+ 'InputFormat',
+ 'BulkWriterFactory',
+ 'FileEnumeratorProvider',
+ 'FileSplitAssignerProvider',
+ 'RollingPolicy',
+ 'BucketAssigner'
]
diff --git a/flink-python/pyflink/datastream/connectors/jdbc.py b/flink-python/pyflink/datastream/connectors/jdbc.py
index a7f1c985461..5270a0be918 100644
--- a/flink-python/pyflink/datastream/connectors/jdbc.py
+++ b/flink-python/pyflink/datastream/connectors/jdbc.py
@@ -21,6 +21,13 @@ from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray
+__all__ = [
+ 'JdbcSink',
+ 'JdbcConnectionOptions',
+ 'JdbcExecutionOptions'
+]
+
+
class JdbcSink(SinkFunction):
def __init__(self, j_jdbc_sink):
diff --git a/flink-python/pyflink/datastream/connectors/kafka.py b/flink-python/pyflink/datastream/connectors/kafka.py
index 707dd679046..e7d98f079d2 100644
--- a/flink-python/pyflink/datastream/connectors/kafka.py
+++ b/flink-python/pyflink/datastream/connectors/kafka.py
@@ -34,17 +34,17 @@ from pyflink.util.java_utils import to_jarray, get_field, get_field_value
__all__ = [
'FlinkKafkaConsumer',
'FlinkKafkaProducer',
- 'Semantic',
'KafkaSource',
'KafkaSourceBuilder',
+ 'KafkaSink',
+ 'KafkaSinkBuilder',
+ 'Semantic',
'KafkaTopicPartition',
'KafkaOffsetsInitializer',
'KafkaOffsetResetStrategy',
- 'KafkaSink',
- 'KafkaSinkBuilder',
'KafkaRecordSerializationSchema',
'KafkaRecordSerializationSchemaBuilder',
- 'KafkaTopicSelector',
+ 'KafkaTopicSelector'
]
diff --git a/flink-python/pyflink/datastream/connectors/kinesis.py b/flink-python/pyflink/datastream/connectors/kinesis.py
index 06d18e7ed8c..ebf4666f2d1 100644
--- a/flink-python/pyflink/datastream/connectors/kinesis.py
+++ b/flink-python/pyflink/datastream/connectors/kinesis.py
@@ -23,13 +23,17 @@ from pyflink.datastream.functions import SourceFunction
from pyflink.datastream.connectors import Sink
from pyflink.java_gateway import get_gateway
-__all__ = ['KinesisShardAssigner',
- 'KinesisDeserializationSchema',
- 'WatermarkTracker',
- 'PartitionKeyGenerator',
- 'FlinkKinesisConsumer',
- 'KinesisStreamsSink',
- 'KinesisFirehoseSink']
+__all__ = [
+ 'KinesisShardAssigner',
+ 'KinesisDeserializationSchema',
+ 'WatermarkTracker',
+ 'PartitionKeyGenerator',
+ 'FlinkKinesisConsumer',
+ 'KinesisStreamsSink',
+ 'KinesisStreamsSinkBuilder',
+ 'KinesisFirehoseSink',
+ 'KinesisFirehoseSinkBuilder'
+]
# ---- KinesisSource ----
diff --git a/flink-python/pyflink/datastream/connectors/number_seq.py b/flink-python/pyflink/datastream/connectors/number_seq.py
index fca9a28703f..ee5cba987b3 100644
--- a/flink-python/pyflink/datastream/connectors/number_seq.py
+++ b/flink-python/pyflink/datastream/connectors/number_seq.py
@@ -18,6 +18,10 @@
from pyflink.datastream.connectors import Source
from pyflink.java_gateway import get_gateway
+__all__ = [
+ 'NumberSequenceSource'
+]
+
class NumberSequenceSource(Source):
"""
@@ -33,7 +37,7 @@ class NumberSequenceSource(Source):
because, despite the fact that the produced stream is bounded, the end bound is pretty far away.
"""
- def __init__(self, start, end):
+ def __init__(self, start: int, end: int):
"""
Creates a new NumberSequenceSource that produces parallel sequences covering the
range start to end (both boundaries are inclusive).
diff --git a/flink-python/pyflink/datastream/connectors/pulsar.py b/flink-python/pyflink/datastream/connectors/pulsar.py
index 79efca87f04..c9ad53dfd62 100644
--- a/flink-python/pyflink/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/datastream/connectors/pulsar.py
@@ -26,6 +26,21 @@ from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import load_java_class
+__all__ = [
+ 'PulsarSource',
+ 'PulsarSourceBuilder',
+ 'PulsarDeserializationSchema',
+ 'SubscriptionType',
+ 'StartCursor',
+ 'StopCursor',
+ 'PulsarSink',
+ 'PulsarSinkBuilder',
+ 'PulsarSerializationSchema',
+ 'MessageDelayer',
+ 'TopicRoutingMode'
+]
+
+
# ---- PulsarSource ----
diff --git a/flink-python/pyflink/datastream/connectors/rabbitmq.py b/flink-python/pyflink/datastream/connectors/rabbitmq.py
index e97eb42a073..b0f6ed45e7f 100644
--- a/flink-python/pyflink/datastream/connectors/rabbitmq.py
+++ b/flink-python/pyflink/datastream/connectors/rabbitmq.py
@@ -20,6 +20,13 @@ from pyflink.datastream.functions import SinkFunction, SourceFunction
from pyflink.java_gateway import get_gateway
+__all__ = [
+ 'RMQConnectionConfig',
+ 'RMQSource',
+ 'RMQSink'
+]
+
+
class RMQConnectionConfig(object):
"""
Connection Configuration for RMQ.
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_connectors.py b/flink-python/pyflink/datastream/connectors/tests/test_connectors.py
index d6631fe3a02..1d12083cd3e 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_connectors.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_connectors.py
@@ -16,23 +16,26 @@
# limitations under the License.
################################################################################
-from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, \
- FlushBackoffType, ElasticsearchEmitter
-
from pyflink.common import typeinfo, Duration, WatermarkStrategy, ConfigOptions
from pyflink.common.serialization import JsonRowDeserializationSchema, \
JsonRowSerializationSchema, Encoder, SimpleStringSchema
from pyflink.common.typeinfo import Types
-from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer, JdbcSink, \
- JdbcConnectionOptions, JdbcExecutionOptions, StreamingFileSink, \
- OutputFileConfig, FileSource, StreamFormat, FileEnumeratorProvider, FileSplitAssignerProvider, \
- NumberSequenceSource, RollingPolicy, FileSink, BucketAssigner, RMQSink, RMQSource, \
- RMQConnectionConfig, PulsarSource, StartCursor, PulsarDeserializationSchema, StopCursor, \
- SubscriptionType, PulsarSink, PulsarSerializationSchema, DeliveryGuarantee, TopicRoutingMode, \
- MessageDelayer, FlinkKinesisConsumer, KinesisStreamsSink, KinesisFirehoseSink
+from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.cassandra import CassandraSink, MapperOptions, ConsistencyLevel
-from pyflink.datastream.connectors.file_system import FileCompactStrategy, FileCompactor
-from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, \
+ FlushBackoffType, ElasticsearchEmitter
+from pyflink.datastream.connectors.file_system import FileCompactStrategy, FileCompactor, \
+ StreamingFileSink, OutputFileConfig, FileSource, StreamFormat, FileEnumeratorProvider, \
+ FileSplitAssignerProvider, RollingPolicy, FileSink, BucketAssigner
+from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
+from pyflink.datastream.connectors.number_seq import NumberSequenceSource
+from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer
+from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator, FlinkKinesisConsumer, \
+ KinesisStreamsSink, KinesisFirehoseSink
+from pyflink.datastream.connectors.pulsar import PulsarSerializationSchema, TopicRoutingMode, \
+ MessageDelayer, PulsarSink, PulsarSource, StartCursor, PulsarDeserializationSchema, \
+ StopCursor, SubscriptionType
+from pyflink.datastream.connectors.rabbitmq import RMQSink, RMQSource, RMQConnectionConfig
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import invoke_java_object_method, PyFlinkStreamingTestCase
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
index e5d18dd2309..d845bde6449 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -18,8 +18,8 @@
from typing import Optional
from pyflink.common.typeinfo import _from_java_type
-from pyflink.datastream.connectors import StreamFormat
-from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory
+from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory, \
+ StreamFormat
from pyflink.java_gateway import get_gateway
from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 06ce00eb1c8..c80ab5f7d9b 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -32,7 +32,7 @@ from pyflink.common.typeinfo import Types
from pyflink.datastream import (StreamExecutionEnvironment, CheckpointConfig,
CheckpointingMode, MemoryStateBackend, TimeCharacteristic,
SlotSharingGroup)
-from pyflink.datastream.connectors import FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.functions import SourceFunction
from pyflink.datastream.slot_sharing_group import MemorySize
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
index 9e3b2658d89..dd3711aff90 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
@@ -20,7 +20,7 @@ import sys
from pyflink.common import AvroRowSerializationSchema, Types, AvroRowDeserializationSchema
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
# Make sure that the Kafka cluster is started and the topic 'test_avro_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
index 21aa3ab17a6..99d5d62e90c 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
@@ -20,7 +20,7 @@ import sys
from pyflink.common import Types, JsonRowDeserializationSchema, CsvRowSerializationSchema
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
# Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
index 10bf85394a2..cf650fdb731 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
@@ -20,7 +20,7 @@ import sys
from pyflink.common import Types, JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/pulsar.py b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
index 2278d522dbf..8e0d60879f7 100644
--- a/flink-python/pyflink/examples/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
@@ -21,9 +21,9 @@ import sys
from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import PulsarSource, PulsarSink, PulsarSerializationSchema, \
- StartCursor, StopCursor, SubscriptionType, PulsarDeserializationSchema, DeliveryGuarantee, \
- TopicRoutingMode
+from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink,\
+ PulsarSerializationSchema, StartCursor, StopCursor, SubscriptionType, \
+ PulsarDeserializationSchema, DeliveryGuarantee, TopicRoutingMode
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
diff --git a/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
index ebfce5212d0..82cf8a991a2 100644
--- a/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
@@ -20,7 +20,7 @@ import sys
import argparse
from typing import Iterable
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, WatermarkStrategy, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
diff --git a/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
index 2cca566264d..39ba8093abe 100644
--- a/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
@@ -20,7 +20,7 @@ import sys
import argparse
from typing import Iterable
-from pyflink.datastream.connectors import FileSink, RollingPolicy, OutputFileConfig
+from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy, OutputFileConfig
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
diff --git a/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
index 3f68d8439d9..287a431614f 100644
--- a/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
@@ -20,7 +20,7 @@ import sys
import argparse
from typing import Iterable
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
diff --git a/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
index 084757608e0..580dd84ec6f 100644
--- a/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
@@ -20,7 +20,7 @@ import sys
import argparse
from typing import Iterable
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, Encoder
from pyflink.datastream import StreamExecutionEnvironment, WindowFunction
diff --git a/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
index 0576fe6dc90..3fbd150fcc2 100644
--- a/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
+++ b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
@@ -20,7 +20,7 @@ import sys
import argparse
from typing import Iterable
-from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy
+from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
diff --git a/flink-python/pyflink/examples/datastream/word_count.py b/flink-python/pyflink/examples/datastream/word_count.py
index ac5ac08bbc4..e827c5b53ca 100644
--- a/flink-python/pyflink/examples/datastream/word_count.py
+++ b/flink-python/pyflink/examples/datastream/word_count.py
@@ -21,8 +21,8 @@ import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
-from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,
- RollingPolicy)
+from pyflink.datastream.connectors.file_system import (FileSource, StreamFormat, FileSink,
+ OutputFileConfig, RollingPolicy)
word_count_data = ["To be, or not to be,--that is the question:--",