You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/08 14:52:31 UTC

[flink] branch master updated: [hotfix][python] Make the format imports more explicit by adding format type

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 413912d7fd3 [hotfix][python] Make the format imports more explicit by adding format type
413912d7fd3 is described below

commit 413912d7fd3c1c3aa4c3a8efda70b4327fa99368
Author: Dian Fu <di...@apache.org>
AuthorDate: Mon Aug 8 18:20:56 2022 +0800

    [hotfix][python] Make the format imports more explicit by adding format type
---
 flink-python/docs/pyflink.datastream.rst           | 78 ++++++++++++++++++++--
 flink-python/pyflink/datastream/__init__.py        | 30 ++++-----
 .../pyflink/datastream/connectors/cassandra.py     |  2 +-
 .../pyflink/datastream/connectors/file_system.py   | 44 ++++++------
 .../pyflink/datastream/connectors/kinesis.py       | 15 +++--
 .../pyflink/datastream/formats/__init__.py         | 17 -----
 flink-python/pyflink/datastream/formats/avro.py    | 11 ++-
 flink-python/pyflink/datastream/formats/csv.py     |  7 ++
 flink-python/pyflink/datastream/formats/parquet.py |  7 ++
 .../pyflink/datastream/formats/tests/test_avro.py  |  2 +-
 .../pyflink/datastream/formats/tests/test_csv.py   |  2 +-
 .../datastream/formats/tests/test_parquet.py       |  2 +-
 12 files changed, 142 insertions(+), 75 deletions(-)

diff --git a/flink-python/docs/pyflink.datastream.rst b/flink-python/docs/pyflink.datastream.rst
index cc12f772c25..646d251c563 100644
--- a/flink-python/docs/pyflink.datastream.rst
+++ b/flink-python/docs/pyflink.datastream.rst
@@ -28,6 +28,12 @@ Module contents
     :show-inheritance:
     :inherited-members:
 
+pyflink.datastream.window module
+--------------------------------
+.. automodule:: pyflink.datastream.window
+    :members:
+    :undoc-members:
+
 pyflink.datastream.state module
 ------------------------------------
 .. automodule:: pyflink.datastream.state
@@ -41,14 +47,74 @@ pyflink.datastream.connectors module
     :members:
     :undoc-members:
 
-pyflink.datastream.formats module
-------------------------------------
-.. automodule:: pyflink.datastream.formats
+pyflink.datastream.connectors.kafka module
+---------------------------------------------
+.. automodule:: pyflink.datastream.connectors.kafka
     :members:
     :undoc-members:
 
-pyflink.datastream.window module
---------------------------------
-.. automodule:: pyflink.datastream.window
+pyflink.datastream.connectors.file_system module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.file_system
+    :members:
+    :undoc-members:
+
+pyflink.datastream.connectors.kinesis module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.kinesis
+    :members:
+    :undoc-members:
+
+pyflink.datastream.connectors.number_seq module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.number_seq
+    :members:
+    :undoc-members:
+
+pyflink.datastream.connectors.jdbc module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.jdbc
+    :members:
+    :undoc-members:
+
+pyflink.datastream.connectors.elasticsearch module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.elasticsearch
+    :members:
+    :undoc-members:
+
+pyflink.datastream.connectors.rabbitmq module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.rabbitmq
+    :members:
+    :undoc-members:
+
+pyflink.datastream.connectors.pulsar module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.pulsar
+    :members:
+    :undoc-members:
+
+pyflink.datastream.connectors.cassandra module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.cassandra
+    :members:
+    :undoc-members:
+
+pyflink.datastream.formats.csv module
+------------------------------------------
+.. automodule:: pyflink.datastream.formats.csv
+    :members:
+    :undoc-members:
+
+pyflink.datastream.formats.avro module
+---------------------------------------------
+.. automodule:: pyflink.datastream.formats.avro
+    :members:
+    :undoc-members:
+
+pyflink.datastream.formats.parquet module
+---------------------------------------------
+.. automodule:: pyflink.datastream.formats.parquet
     :members:
     :undoc-members:
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 32a3d092266..11ebea667f4 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -210,25 +210,25 @@ Classes to define source & sink:
 
 Classes to define formats used together with source & sink:
 
-    - :class:`formats.CsvReaderFormat`:
-      A :class:`connectors.StreamFormat` to read CSV files into Row data.
-    - :class:`formats.CsvBulkWriter`:
-      Creates :class:`connectors.BulkWriterFactory` to write Row data into CSV files.
-    - :class:`formats.GenericRecordAvroTypeInfo`:
-      A :class:`TypeInformation` to indicate vanilla Python records will be translated to
-      GenericRecordAvroTypeInfo on the Java side.
-    - :class:`formats.AvroInputFormat`:
-      A :class:`connector.filesystem.InputFormat` to read avro files in a streaming fashion.
-    - :class:`formats.AvroWriters`:
-      A class to provide :class:`connector.filesystem.BulkWriterFactory` to write vanilla Python
+    - :class:`formats.csv.CsvReaderFormat`:
+      A :class:`connectors.file_system.StreamFormat` to read CSV files into Row data.
+    - :class:`formats.csv.CsvBulkWriter`:
+      Creates :class:`connectors.file_system.BulkWriterFactory` to write Row data into CSV files.
+    - :class:`formats.avro.GenericRecordAvroTypeInfo`:
+      A :class:`pyflink.common.typeinfo.TypeInformation` to indicate vanilla Python records will be
+      translated to GenericRecordAvroTypeInfo on the Java side.
+    - :class:`formats.avro.AvroInputFormat`:
+      A :class:`connector.file_system.InputFormat` to read avro files in a streaming fashion.
+    - :class:`formats.avro.AvroWriters`:
+      A class to provide :class:`connector.file_system.BulkWriterFactory` to write vanilla Python
       objects into avro files in a batch fashion.
-    - :class:`formats.ParquetColumnarRowInputFormat`:
-      A :class:`connectors.BulkFormat` to read columnar parquet files into Row data in a
+    - :class:`formats.parquet.ParquetColumnarRowInputFormat`:
+      A :class:`connectors.file_system.BulkFormat` to read columnar parquet files into Row data in a
       batch-processing fashion.
-    - :class:`formats.AvroParquetReaders`:
+    - :class:`formats.parquet.AvroParquetReaders`:
       A convenience builder to create reader format that reads individual Avro records from a
       Parquet stream. Only GenericRecord is supported in PyFlink.
-    - :class:`formats.AvroParquetWriters`:
+    - :class:`formats.parquet.AvroParquetWriters`:
       Convenience builder to create ParquetWriterFactory instances for Avro types. Only
       GenericRecord is supported in PyFlink.
 
diff --git a/flink-python/pyflink/datastream/connectors/cassandra.py b/flink-python/pyflink/datastream/connectors/cassandra.py
index 5cd099d47d8..8fa44fa99e8 100644
--- a/flink-python/pyflink/datastream/connectors/cassandra.py
+++ b/flink-python/pyflink/datastream/connectors/cassandra.py
@@ -155,7 +155,7 @@ class CassandraCommitter(object):
         CheckpointCommitter that saves information about completed checkpoints within a separate
         table in a cassandra database.
 
-        Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
+        Entries are in the form: | operator_id | subtask_id | last_completed_checkpoint |
         """
         JCassandraCommitter = get_gateway().jvm.org.apache.flink.streaming.connectors.\
             cassandra.CassandraCommitter
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py b/flink-python/pyflink/datastream/connectors/file_system.py
index c8236333921..1a147495a1d 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -109,9 +109,8 @@ class StreamFormat(object):
     """
     A reader format that reads individual records from a stream.
 
-    Compared to the :class:`~pyflink.datastream.connectors.FileSource.BulkFormat`, the stream
-    format handles a few things out-of-the-box, like deciding how to batch records or dealing
-    with compression.
+    Compared to the :class:`~BulkFormat`, the stream format handles a few things out-of-the-box,
+    like deciding how to batch records or dealing with compression.
 
     Internally in the file source, the readers pass batches of records from the reading threads
     (that perform the typically blocking I/O operations) to the async mailbox threads that do
@@ -202,12 +201,11 @@ class RowDataBulkWriterFactory(BulkWriterFactory):
 
 class FileSourceBuilder(object):
     """
-    The builder for the :class:`~pyflink.datastream.connectors.FileSource`, to configure the
-    various behaviors.
+    The builder for the :class:`~FileSource`, to configure the various behaviors.
 
     Start building the source via one of the following methods:
 
-        - :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format`
+        - :func:`~FileSource.for_record_stream_format`
     """
 
     def __init__(self, j_file_source_builder):
@@ -283,10 +281,10 @@ class FileSource(Source):
 
     Start building a file source via one of the following calls:
 
-        - :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format`
+        - :func:`~FileSource.for_record_stream_format`
 
-    This creates a :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` on which
-    you can configure all the properties of the file source.
+    This creates a :class:`~FileSource.FileSourceBuilder` on which you can configure all the
+    properties of the file source.
 
     <h2>Batch and Streaming</h2>
 
@@ -296,10 +294,10 @@ class FileSource(Source):
     reading those.
 
     When you start creating a file source (via the
-    :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` created
-    through one of the above-mentioned methods) the source is by default in bounded/batch mode. Call
-    :func:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder.monitor_continuously` to put
-    the source into continuous streaming mode.
+    :class:`~FileSource.FileSourceBuilder` created through one of the above-mentioned methods)
+    the source is by default in bounded/batch mode. Call
+    :func:`~FileSource.FileSourceBuilder.monitor_continuously` to put the source into continuous
+    streaming mode.
 
     <h2>Format Types</h2>
 
@@ -308,18 +306,17 @@ class FileSource(Source):
     source supports. Their interfaces trade of simplicity of implementation and
     flexibility/efficiency.
 
-        - A :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` reads the contents of
-          a file from a file stream. It is the simplest format to implement, and provides many
-          features out-of-the-box (like checkpointing logic) but is limited in the optimizations it
+        - A :class:`~FileSource.StreamFormat` reads the contents of a file from a file stream.
+          It is the simplest format to implement, and provides many features out-of-the-box
+          (like checkpointing logic) but is limited in the optimizations it
           can apply (such as object reuse, batching, etc.).
 
     <h2>Discovering / Enumerating Files</h2>
 
     The way that the source lists the files to be processes is defined by the
-    :class:`~pyflink.datastream.connectors.FileSource.FileEnumeratorProvider`. The
-    FileEnumeratorProvider is responsible to select the relevant files (for example filter out
-    hidden files) and to optionally splits files into multiple regions (= file source splits) that
-    can be read in parallel).
+    :class:`~FileSource.FileEnumeratorProvider`. The FileEnumeratorProvider is responsible to
+    select the relevant files (for example filter out hidden files) and to optionally splits files
+    into multiple regions (= file source splits) that can be read in parallel).
     """
 
     def __init__(self, j_file_source):
@@ -328,8 +325,7 @@ class FileSource(Source):
     @staticmethod
     def for_record_stream_format(stream_format: StreamFormat, *paths: str) -> FileSourceBuilder:
         """
-        Builds a new FileSource using a
-        :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` to read record-by-record
+        Builds a new FileSource using a :class:`~FileSource.StreamFormat` to read record-by-record
         from a file stream.
 
         When possible, stream-based formats are generally easier (preferable) to file-based
@@ -632,8 +628,8 @@ class FileSink(Sink, SupportsPreprocessing):
     on every checkpoint or use time or a property of the element to determine the bucket directory.
     The default BucketAssigner is a DateTimeBucketAssigner which will create one new
     bucket every hour. You can specify a custom BucketAssigner using the
-    :func:`~pyflink.datastream.connectors.FileSink.RowFormatBuilder.with_bucket_assigner`,
-    after calling :class:`~pyflink.datastream.connectors.FileSink.for_row_format`.
+    :func:`~FileSink.RowFormatBuilder.with_bucket_assigner`, after calling
+    :class:`~FileSink.for_row_format`.
 
     The names of the part files could be defined using OutputFileConfig. This
     configuration contains a part prefix and a part suffix that will be used with a random uid
diff --git a/flink-python/pyflink/datastream/connectors/kinesis.py b/flink-python/pyflink/datastream/connectors/kinesis.py
index ebf4666f2d1..355c954893e 100644
--- a/flink-python/pyflink/datastream/connectors/kinesis.py
+++ b/flink-python/pyflink/datastream/connectors/kinesis.py
@@ -262,6 +262,7 @@ class KinesisStreamsSinkBuilder(object):
 
     Example:
     ::
+
         >>> from pyflink.common.serialization import SimpleStringSchema
         >>> sink_properties = {"aws.region": "eu-west-1"}
         >>> sink = KinesisStreamsSink.builder() \\
@@ -273,13 +274,13 @@ class KinesisStreamsSinkBuilder(object):
 
     If the following parameters are not set in this builder, the following defaults will be used:
 
-    - maxBatchSize will be 500
-    - maxInFlightRequests will be 50
-    - maxBufferedRequests will be 10000
-    - maxBatchSizeInBytes will be 5 MB i.e. 5 * 1024 * 1024
-    - maxTimeInBufferMS will be 5000ms
-    - maxRecordSizeInBytes will be 1 MB i.e. 1 * 1024 * 1024
-    - failOnError will be false
+        - maxBatchSize will be 500
+        - maxInFlightRequests will be 50
+        - maxBufferedRequests will be 10000
+        - maxBatchSizeInBytes will be 5 MB i.e. 5 * 1024 * 1024
+        - maxTimeInBufferMS will be 5000ms
+        - maxRecordSizeInBytes will be 1 MB i.e. 1 * 1024 * 1024
+        - failOnError will be false
     """
 
     def __init__(self):
diff --git a/flink-python/pyflink/datastream/formats/__init__.py b/flink-python/pyflink/datastream/formats/__init__.py
index 48d582cfaff..65b48d4d79b 100644
--- a/flink-python/pyflink/datastream/formats/__init__.py
+++ b/flink-python/pyflink/datastream/formats/__init__.py
@@ -15,20 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from .avro import AvroSchema, AvroInputFormat, AvroWriters, GenericRecordAvroTypeInfo
-from .csv import CsvSchema, CsvSchemaBuilder, CsvReaderFormat, CsvBulkWriter
-from .parquet import AvroParquetReaders, AvroParquetWriters, ParquetColumnarRowInputFormat
-
-__all__ = [
-    'AvroInputFormat',
-    'AvroParquetReaders',
-    'AvroParquetWriters',
-    'AvroSchema',
-    'AvroWriters',
-    'CsvBulkWriter',
-    'CsvReaderFormat',
-    'CsvSchema',
-    'CsvSchemaBuilder',
-    'GenericRecordAvroTypeInfo',
-    'ParquetColumnarRowInputFormat'
-]
diff --git a/flink-python/pyflink/datastream/formats/avro.py b/flink-python/pyflink/datastream/formats/avro.py
index 2ffb5e6d407..66a16eb9f7d 100644
--- a/flink-python/pyflink/datastream/formats/avro.py
+++ b/flink-python/pyflink/datastream/formats/avro.py
@@ -24,6 +24,14 @@ from pyflink.java_gateway import get_gateway
 from pyflink.util.java_utils import get_field_value
 
 
+__all__ = [
+    'AvroInputFormat',
+    'AvroSchema',
+    'AvroWriters',
+    'GenericRecordAvroTypeInfo'
+]
+
+
 class AvroSchema(object):
     """
     Avro Schema class contains Java org.apache.avro.Schema.
@@ -103,7 +111,7 @@ class AvroInputFormat(InputFormat, ResultTypeQueryable):
     def __init__(self, path: str, schema: 'AvroSchema'):
         """
         :param path: The path to Avro data file.
-        :param schema: The :class:`Schema` of generic record.
+        :param schema: The :class:`AvroSchema` of generic record.
         """
         jvm = get_gateway().jvm
         j_avro_input_format = jvm.org.apache.flink.formats.avro.AvroInputFormat(
@@ -111,7 +119,6 @@ class AvroInputFormat(InputFormat, ResultTypeQueryable):
             get_java_class(jvm.org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord)
         )
         super().__init__(j_avro_input_format)
-        self._schema = schema
         self._type_info = GenericRecordAvroTypeInfo(schema)
 
     def get_produced_type(self) -> GenericRecordAvroTypeInfo:
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
index d845bde6449..a0d0f803440 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -23,6 +23,13 @@ from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowData
 from pyflink.java_gateway import get_gateway
 from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
 
+__all__ = [
+    'CsvBulkWriter',
+    'CsvReaderFormat',
+    'CsvSchema',
+    'CsvSchemaBuilder'
+]
+
 
 class CsvSchema(object):
     """
diff --git a/flink-python/pyflink/datastream/formats/parquet.py b/flink-python/pyflink/datastream/formats/parquet.py
index b23198ca509..39bc6a7d321 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -22,6 +22,13 @@ from pyflink.java_gateway import get_gateway
 from pyflink.table.types import RowType, _to_java_data_type
 
 
+__all__ = [
+    'AvroParquetReaders',
+    'AvroParquetWriters',
+    'ParquetColumnarRowInputFormat'
+]
+
+
 class AvroParquetReaders(object):
     """
     A convenience builder to create reader format that reads individual Avro records from a
diff --git a/flink-python/pyflink/datastream/formats/tests/test_avro.py b/flink-python/pyflink/datastream/formats/tests/test_avro.py
index ade29e13dbd..01e7c1ea66a 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_avro.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_avro.py
@@ -26,7 +26,7 @@ from py4j.java_gateway import JavaObject, java_import
 
 from pyflink.datastream import MapFunction
 from pyflink.datastream.connectors.file_system import FileSink
-from pyflink.datastream.formats import AvroSchema, GenericRecordAvroTypeInfo, AvroWriters, \
+from pyflink.datastream.formats.avro import AvroSchema, GenericRecordAvroTypeInfo, AvroWriters, \
     AvroInputFormat
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
 from pyflink.java_gateway import get_gateway
diff --git a/flink-python/pyflink/datastream/formats/tests/test_csv.py b/flink-python/pyflink/datastream/formats/tests/test_csv.py
index 326095392e4..e4b14a9ec35 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_csv.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_csv.py
@@ -23,7 +23,7 @@ from typing import Tuple, List
 from pyflink.common import WatermarkStrategy, Types
 from pyflink.datastream import MapFunction
 from pyflink.datastream.connectors.file_system import FileSource, FileSink
-from pyflink.datastream.formats import CsvSchema, CsvReaderFormat, CsvBulkWriter
+from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat, CsvBulkWriter
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
 from pyflink.table import DataTypes
 from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
diff --git a/flink-python/pyflink/datastream/formats/tests/test_parquet.py b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
index 97e8ffaf0a4..b914d185e9f 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
@@ -34,7 +34,7 @@ from pyflink.datastream.formats.tests.test_avro import \
     _create_map_avro_schema_and_records, _create_array_avro_schema_and_records, \
     _create_union_avro_schema_and_records, _create_enum_avro_schema_and_records, \
     _create_basic_avro_schema_and_records, _import_avro_classes
-from pyflink.datastream.formats import GenericRecordAvroTypeInfo, AvroSchema
+from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, AvroSchema
 from pyflink.datastream.formats.parquet import AvroParquetReaders, ParquetColumnarRowInputFormat, \
     AvroParquetWriters
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction