You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/08 14:52:31 UTC
[flink] branch master updated: [hotfix][python] Make the format imports more explicit by adding format type
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 413912d7fd3 [hotfix][python] Make the format imports more explicit by adding format type
413912d7fd3 is described below
commit 413912d7fd3c1c3aa4c3a8efda70b4327fa99368
Author: Dian Fu <di...@apache.org>
AuthorDate: Mon Aug 8 18:20:56 2022 +0800
[hotfix][python] Make the format imports more explicit by adding format type
---
flink-python/docs/pyflink.datastream.rst | 78 ++++++++++++++++++++--
flink-python/pyflink/datastream/__init__.py | 30 ++++-----
.../pyflink/datastream/connectors/cassandra.py | 2 +-
.../pyflink/datastream/connectors/file_system.py | 44 ++++++------
.../pyflink/datastream/connectors/kinesis.py | 15 +++--
.../pyflink/datastream/formats/__init__.py | 17 -----
flink-python/pyflink/datastream/formats/avro.py | 11 ++-
flink-python/pyflink/datastream/formats/csv.py | 7 ++
flink-python/pyflink/datastream/formats/parquet.py | 7 ++
.../pyflink/datastream/formats/tests/test_avro.py | 2 +-
.../pyflink/datastream/formats/tests/test_csv.py | 2 +-
.../datastream/formats/tests/test_parquet.py | 2 +-
12 files changed, 142 insertions(+), 75 deletions(-)
diff --git a/flink-python/docs/pyflink.datastream.rst b/flink-python/docs/pyflink.datastream.rst
index cc12f772c25..646d251c563 100644
--- a/flink-python/docs/pyflink.datastream.rst
+++ b/flink-python/docs/pyflink.datastream.rst
@@ -28,6 +28,12 @@ Module contents
:show-inheritance:
:inherited-members:
+pyflink.datastream.window module
+--------------------------------
+.. automodule:: pyflink.datastream.window
+ :members:
+ :undoc-members:
+
pyflink.datastream.state module
------------------------------------
.. automodule:: pyflink.datastream.state
@@ -41,14 +47,74 @@ pyflink.datastream.connectors module
:members:
:undoc-members:
-pyflink.datastream.formats module
-------------------------------------
-.. automodule:: pyflink.datastream.formats
+pyflink.datastream.connectors.kafka module
+---------------------------------------------
+.. automodule:: pyflink.datastream.connectors.kafka
:members:
:undoc-members:
-pyflink.datastream.window module
---------------------------------
-.. automodule:: pyflink.datastream.window
+pyflink.datastream.connectors.file_system module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.file_system
+ :members:
+ :undoc-members:
+
+pyflink.datastream.connectors.kinesis module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.kinesis
+ :members:
+ :undoc-members:
+
+pyflink.datastream.connectors.number_seq module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.number_seq
+ :members:
+ :undoc-members:
+
+pyflink.datastream.connectors.jdbc module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.jdbc
+ :members:
+ :undoc-members:
+
+pyflink.datastream.connectors.elasticsearch module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.elasticsearch
+ :members:
+ :undoc-members:
+
+pyflink.datastream.connectors.rabbitmq module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.rabbitmq
+ :members:
+ :undoc-members:
+
+pyflink.datastream.connectors.pulsar module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.pulsar
+ :members:
+ :undoc-members:
+
+pyflink.datastream.connectors.cassandra module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.cassandra
+ :members:
+ :undoc-members:
+
+pyflink.datastream.formats.csv module
+------------------------------------------
+.. automodule:: pyflink.datastream.formats.csv
+ :members:
+ :undoc-members:
+
+pyflink.datastream.formats.avro module
+---------------------------------------------
+.. automodule:: pyflink.datastream.formats.avro
+ :members:
+ :undoc-members:
+
+pyflink.datastream.formats.parquet module
+---------------------------------------------
+.. automodule:: pyflink.datastream.formats.parquet
:members:
:undoc-members:
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 32a3d092266..11ebea667f4 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -210,25 +210,25 @@ Classes to define source & sink:
Classes to define formats used together with source & sink:
- - :class:`formats.CsvReaderFormat`:
- A :class:`connectors.StreamFormat` to read CSV files into Row data.
- - :class:`formats.CsvBulkWriter`:
- Creates :class:`connectors.BulkWriterFactory` to write Row data into CSV files.
- - :class:`formats.GenericRecordAvroTypeInfo`:
- A :class:`TypeInformation` to indicate vanilla Python records will be translated to
- GenericRecordAvroTypeInfo on the Java side.
- - :class:`formats.AvroInputFormat`:
- A :class:`connector.filesystem.InputFormat` to read avro files in a streaming fashion.
- - :class:`formats.AvroWriters`:
- A class to provide :class:`connector.filesystem.BulkWriterFactory` to write vanilla Python
+ - :class:`formats.csv.CsvReaderFormat`:
+ A :class:`connectors.file_system.StreamFormat` to read CSV files into Row data.
+ - :class:`formats.csv.CsvBulkWriter`:
+ Creates :class:`connectors.file_system.BulkWriterFactory` to write Row data into CSV files.
+ - :class:`formats.avro.GenericRecordAvroTypeInfo`:
+ A :class:`pyflink.common.typeinfo.TypeInformation` to indicate vanilla Python records will be
+ translated to GenericRecordAvroTypeInfo on the Java side.
+ - :class:`formats.avro.AvroInputFormat`:
+ A :class:`connector.file_system.InputFormat` to read avro files in a streaming fashion.
+ - :class:`formats.avro.AvroWriters`:
+ A class to provide :class:`connector.file_system.BulkWriterFactory` to write vanilla Python
objects into avro files in a batch fashion.
- - :class:`formats.ParquetColumnarRowInputFormat`:
- A :class:`connectors.BulkFormat` to read columnar parquet files into Row data in a
+ - :class:`formats.parquet.ParquetColumnarRowInputFormat`:
+ A :class:`connectors.file_system.BulkFormat` to read columnar parquet files into Row data in a
batch-processing fashion.
- - :class:`formats.AvroParquetReaders`:
+ - :class:`formats.parquet.AvroParquetReaders`:
A convenience builder to create reader format that reads individual Avro records from a
Parquet stream. Only GenericRecord is supported in PyFlink.
- - :class:`formats.AvroParquetWriters`:
+ - :class:`formats.parquet.AvroParquetWriters`:
Convenience builder to create ParquetWriterFactory instances for Avro types. Only
GenericRecord is supported in PyFlink.
diff --git a/flink-python/pyflink/datastream/connectors/cassandra.py b/flink-python/pyflink/datastream/connectors/cassandra.py
index 5cd099d47d8..8fa44fa99e8 100644
--- a/flink-python/pyflink/datastream/connectors/cassandra.py
+++ b/flink-python/pyflink/datastream/connectors/cassandra.py
@@ -155,7 +155,7 @@ class CassandraCommitter(object):
CheckpointCommitter that saves information about completed checkpoints within a separate
table in a cassandra database.
- Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
+ Entries are in the form: | operator_id | subtask_id | last_completed_checkpoint |
"""
JCassandraCommitter = get_gateway().jvm.org.apache.flink.streaming.connectors.\
cassandra.CassandraCommitter
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py b/flink-python/pyflink/datastream/connectors/file_system.py
index c8236333921..1a147495a1d 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -109,9 +109,8 @@ class StreamFormat(object):
"""
A reader format that reads individual records from a stream.
- Compared to the :class:`~pyflink.datastream.connectors.FileSource.BulkFormat`, the stream
- format handles a few things out-of-the-box, like deciding how to batch records or dealing
- with compression.
+ Compared to the :class:`~BulkFormat`, the stream format handles a few things out-of-the-box,
+ like deciding how to batch records or dealing with compression.
Internally in the file source, the readers pass batches of records from the reading threads
(that perform the typically blocking I/O operations) to the async mailbox threads that do
@@ -202,12 +201,11 @@ class RowDataBulkWriterFactory(BulkWriterFactory):
class FileSourceBuilder(object):
"""
- The builder for the :class:`~pyflink.datastream.connectors.FileSource`, to configure the
- various behaviors.
+ The builder for the :class:`~FileSource`, to configure the various behaviors.
Start building the source via one of the following methods:
- - :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format`
+ - :func:`~FileSource.for_record_stream_format`
"""
def __init__(self, j_file_source_builder):
@@ -283,10 +281,10 @@ class FileSource(Source):
Start building a file source via one of the following calls:
- - :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format`
+ - :func:`~FileSource.for_record_stream_format`
- This creates a :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` on which
- you can configure all the properties of the file source.
+ This creates a :class:`~FileSource.FileSourceBuilder` on which you can configure all the
+ properties of the file source.
<h2>Batch and Streaming</h2>
@@ -296,10 +294,10 @@ class FileSource(Source):
reading those.
When you start creating a file source (via the
- :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` created
- through one of the above-mentioned methods) the source is by default in bounded/batch mode. Call
- :func:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder.monitor_continuously` to put
- the source into continuous streaming mode.
+ :class:`~FileSource.FileSourceBuilder` created through one of the above-mentioned methods)
+ the source is by default in bounded/batch mode. Call
+ :func:`~FileSource.FileSourceBuilder.monitor_continuously` to put the source into continuous
+ streaming mode.
<h2>Format Types</h2>
@@ -308,18 +306,17 @@ class FileSource(Source):
source supports. Their interfaces trade of simplicity of implementation and
flexibility/efficiency.
- - A :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` reads the contents of
- a file from a file stream. It is the simplest format to implement, and provides many
- features out-of-the-box (like checkpointing logic) but is limited in the optimizations it
+ - A :class:`~FileSource.StreamFormat` reads the contents of a file from a file stream.
+ It is the simplest format to implement, and provides many features out-of-the-box
+ (like checkpointing logic) but is limited in the optimizations it
can apply (such as object reuse, batching, etc.).
<h2>Discovering / Enumerating Files</h2>
The way that the source lists the files to be processes is defined by the
- :class:`~pyflink.datastream.connectors.FileSource.FileEnumeratorProvider`. The
- FileEnumeratorProvider is responsible to select the relevant files (for example filter out
- hidden files) and to optionally splits files into multiple regions (= file source splits) that
- can be read in parallel).
+ :class:`~FileSource.FileEnumeratorProvider`. The FileEnumeratorProvider is responsible to
+ select the relevant files (for example filter out hidden files) and to optionally splits files
+ into multiple regions (= file source splits) that can be read in parallel).
"""
def __init__(self, j_file_source):
@@ -328,8 +325,7 @@ class FileSource(Source):
@staticmethod
def for_record_stream_format(stream_format: StreamFormat, *paths: str) -> FileSourceBuilder:
"""
- Builds a new FileSource using a
- :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` to read record-by-record
+ Builds a new FileSource using a :class:`~FileSource.StreamFormat` to read record-by-record
from a file stream.
When possible, stream-based formats are generally easier (preferable) to file-based
@@ -632,8 +628,8 @@ class FileSink(Sink, SupportsPreprocessing):
on every checkpoint or use time or a property of the element to determine the bucket directory.
The default BucketAssigner is a DateTimeBucketAssigner which will create one new
bucket every hour. You can specify a custom BucketAssigner using the
- :func:`~pyflink.datastream.connectors.FileSink.RowFormatBuilder.with_bucket_assigner`,
- after calling :class:`~pyflink.datastream.connectors.FileSink.for_row_format`.
+ :func:`~FileSink.RowFormatBuilder.with_bucket_assigner`, after calling
+ :class:`~FileSink.for_row_format`.
The names of the part files could be defined using OutputFileConfig. This
configuration contains a part prefix and a part suffix that will be used with a random uid
diff --git a/flink-python/pyflink/datastream/connectors/kinesis.py b/flink-python/pyflink/datastream/connectors/kinesis.py
index ebf4666f2d1..355c954893e 100644
--- a/flink-python/pyflink/datastream/connectors/kinesis.py
+++ b/flink-python/pyflink/datastream/connectors/kinesis.py
@@ -262,6 +262,7 @@ class KinesisStreamsSinkBuilder(object):
Example:
::
+
>>> from pyflink.common.serialization import SimpleStringSchema
>>> sink_properties = {"aws.region": "eu-west-1"}
>>> sink = KinesisStreamsSink.builder() \\
@@ -273,13 +274,13 @@ class KinesisStreamsSinkBuilder(object):
If the following parameters are not set in this builder, the following defaults will be used:
- - maxBatchSize will be 500
- - maxInFlightRequests will be 50
- - maxBufferedRequests will be 10000
- - maxBatchSizeInBytes will be 5 MB i.e. 5 * 1024 * 1024
- - maxTimeInBufferMS will be 5000ms
- - maxRecordSizeInBytes will be 1 MB i.e. 1 * 1024 * 1024
- - failOnError will be false
+ - maxBatchSize will be 500
+ - maxInFlightRequests will be 50
+ - maxBufferedRequests will be 10000
+ - maxBatchSizeInBytes will be 5 MB i.e. 5 * 1024 * 1024
+ - maxTimeInBufferMS will be 5000ms
+ - maxRecordSizeInBytes will be 1 MB i.e. 1 * 1024 * 1024
+ - failOnError will be false
"""
def __init__(self):
diff --git a/flink-python/pyflink/datastream/formats/__init__.py b/flink-python/pyflink/datastream/formats/__init__.py
index 48d582cfaff..65b48d4d79b 100644
--- a/flink-python/pyflink/datastream/formats/__init__.py
+++ b/flink-python/pyflink/datastream/formats/__init__.py
@@ -15,20 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from .avro import AvroSchema, AvroInputFormat, AvroWriters, GenericRecordAvroTypeInfo
-from .csv import CsvSchema, CsvSchemaBuilder, CsvReaderFormat, CsvBulkWriter
-from .parquet import AvroParquetReaders, AvroParquetWriters, ParquetColumnarRowInputFormat
-
-__all__ = [
- 'AvroInputFormat',
- 'AvroParquetReaders',
- 'AvroParquetWriters',
- 'AvroSchema',
- 'AvroWriters',
- 'CsvBulkWriter',
- 'CsvReaderFormat',
- 'CsvSchema',
- 'CsvSchemaBuilder',
- 'GenericRecordAvroTypeInfo',
- 'ParquetColumnarRowInputFormat'
-]
diff --git a/flink-python/pyflink/datastream/formats/avro.py b/flink-python/pyflink/datastream/formats/avro.py
index 2ffb5e6d407..66a16eb9f7d 100644
--- a/flink-python/pyflink/datastream/formats/avro.py
+++ b/flink-python/pyflink/datastream/formats/avro.py
@@ -24,6 +24,14 @@ from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import get_field_value
+__all__ = [
+ 'AvroInputFormat',
+ 'AvroSchema',
+ 'AvroWriters',
+ 'GenericRecordAvroTypeInfo'
+]
+
+
class AvroSchema(object):
"""
Avro Schema class contains Java org.apache.avro.Schema.
@@ -103,7 +111,7 @@ class AvroInputFormat(InputFormat, ResultTypeQueryable):
def __init__(self, path: str, schema: 'AvroSchema'):
"""
:param path: The path to Avro data file.
- :param schema: The :class:`Schema` of generic record.
+ :param schema: The :class:`AvroSchema` of generic record.
"""
jvm = get_gateway().jvm
j_avro_input_format = jvm.org.apache.flink.formats.avro.AvroInputFormat(
@@ -111,7 +119,6 @@ class AvroInputFormat(InputFormat, ResultTypeQueryable):
get_java_class(jvm.org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord)
)
super().__init__(j_avro_input_format)
- self._schema = schema
self._type_info = GenericRecordAvroTypeInfo(schema)
def get_produced_type(self) -> GenericRecordAvroTypeInfo:
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
index d845bde6449..a0d0f803440 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -23,6 +23,13 @@ from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowData
from pyflink.java_gateway import get_gateway
from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
+__all__ = [
+ 'CsvBulkWriter',
+ 'CsvReaderFormat',
+ 'CsvSchema',
+ 'CsvSchemaBuilder'
+]
+
class CsvSchema(object):
"""
diff --git a/flink-python/pyflink/datastream/formats/parquet.py b/flink-python/pyflink/datastream/formats/parquet.py
index b23198ca509..39bc6a7d321 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -22,6 +22,13 @@ from pyflink.java_gateway import get_gateway
from pyflink.table.types import RowType, _to_java_data_type
+__all__ = [
+ 'AvroParquetReaders',
+ 'AvroParquetWriters',
+ 'ParquetColumnarRowInputFormat'
+]
+
+
class AvroParquetReaders(object):
"""
A convenience builder to create reader format that reads individual Avro records from a
diff --git a/flink-python/pyflink/datastream/formats/tests/test_avro.py b/flink-python/pyflink/datastream/formats/tests/test_avro.py
index ade29e13dbd..01e7c1ea66a 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_avro.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_avro.py
@@ -26,7 +26,7 @@ from py4j.java_gateway import JavaObject, java_import
from pyflink.datastream import MapFunction
from pyflink.datastream.connectors.file_system import FileSink
-from pyflink.datastream.formats import AvroSchema, GenericRecordAvroTypeInfo, AvroWriters, \
+from pyflink.datastream.formats.avro import AvroSchema, GenericRecordAvroTypeInfo, AvroWriters, \
AvroInputFormat
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.java_gateway import get_gateway
diff --git a/flink-python/pyflink/datastream/formats/tests/test_csv.py b/flink-python/pyflink/datastream/formats/tests/test_csv.py
index 326095392e4..e4b14a9ec35 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_csv.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_csv.py
@@ -23,7 +23,7 @@ from typing import Tuple, List
from pyflink.common import WatermarkStrategy, Types
from pyflink.datastream import MapFunction
from pyflink.datastream.connectors.file_system import FileSource, FileSink
-from pyflink.datastream.formats import CsvSchema, CsvReaderFormat, CsvBulkWriter
+from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat, CsvBulkWriter
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.table import DataTypes
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
diff --git a/flink-python/pyflink/datastream/formats/tests/test_parquet.py b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
index 97e8ffaf0a4..b914d185e9f 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
@@ -34,7 +34,7 @@ from pyflink.datastream.formats.tests.test_avro import \
_create_map_avro_schema_and_records, _create_array_avro_schema_and_records, \
_create_union_avro_schema_and_records, _create_enum_avro_schema_and_records, \
_create_basic_avro_schema_and_records, _import_avro_classes
-from pyflink.datastream.formats import GenericRecordAvroTypeInfo, AvroSchema
+from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, AvroSchema
from pyflink.datastream.formats.parquet import AvroParquetReaders, ParquetColumnarRowInputFormat, \
AvroParquetWriters
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction