You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/10 11:02:17 UTC
[flink] branch master updated: [hotfix][python] Move json/avro/csv SerializationSchema implementations into the corresponding files
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 07858933aac [hotfix][python] Move json/avro/csv SerializationSchema implementations into the corresponding files
07858933aac is described below
commit 07858933aac71715f9f39901cab7716d298dd28f
Author: Dian Fu <di...@apache.org>
AuthorDate: Wed Aug 10 11:29:52 2022 +0800
[hotfix][python] Move json/avro/csv SerializationSchema implementations into the corresponding files
---
.../docs/connectors/datastream/filesystem.md | 2 +-
.../docs/connectors/datastream/formats/csv.md | 4 +-
.../python/datastream/intro_to_datastream_api.md | 4 +-
.../docs/connectors/datastream/filesystem.md | 2 +-
.../docs/connectors/datastream/formats/csv.md | 4 +-
.../python/datastream/intro_to_datastream_api.md | 4 +-
.../python/datastream/data_stream_job.py | 3 +-
flink-python/pyflink/common/__init__.py | 50 ++-
flink-python/pyflink/common/io.py | 32 ++
flink-python/pyflink/common/serialization.py | 345 ++-------------------
.../common/tests/test_serialization_schemas.py | 86 +----
flink-python/pyflink/common/utils.py | 25 ++
flink-python/pyflink/datastream/__init__.py | 17 +-
.../pyflink/datastream/connectors/file_system.py | 42 +--
.../datastream/connectors/tests/test_kafka.py | 7 +-
.../datastream/connectors/tests/test_rabbitmq.py | 3 +-
flink-python/pyflink/datastream/formats/avro.py | 99 +++++-
flink-python/pyflink/datastream/formats/csv.py | 170 ++++++++--
flink-python/pyflink/datastream/formats/json.py | 150 +++++++++
flink-python/pyflink/datastream/formats/orc.py | 24 +-
flink-python/pyflink/datastream/formats/parquet.py | 37 ++-
.../pyflink/datastream/formats/tests/test_avro.py | 6 +-
.../pyflink/datastream/formats/tests/test_csv.py | 56 +++-
.../pyflink/datastream/formats/tests/test_json.py | 57 ++++
.../datastream/formats/tests/test_parquet.py | 4 +-
.../datastream/stream_execution_environment.py | 7 +-
.../tests/test_stream_execution_environment.py | 2 +-
flink-python/pyflink/datastream/utils.py | 9 -
.../datastream/connectors/kafka_avro_format.py | 3 +-
.../datastream/connectors/kafka_csv_format.py | 4 +-
.../datastream/connectors/kafka_json_format.py | 3 +-
31 files changed, 709 insertions(+), 552 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md
index d48e84f52d8..20d7a0bee38 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -570,7 +570,7 @@ data_stream = ...
avro_type_info = GenericRecordAvroTypeInfo(schema)
sink = FileSink \
- .for_bulk_format(OUTPUT_BASE_PATH, AvroWriters.for_generic_record(schema)) \
+ .for_bulk_format(OUTPUT_BASE_PATH, AvroBulkWriters.for_generic_record(schema)) \
.build()
# 必须通过 map 操作来指定其 Avro 类型信息,用于数据的序列化
diff --git a/docs/content.zh/docs/connectors/datastream/formats/csv.md b/docs/content.zh/docs/connectors/datastream/formats/csv.md
index 568757f7137..2d3ef10455f 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/csv.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/csv.md
@@ -138,7 +138,7 @@ The corresponding CSV file:
Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both continues and batch modes (see [TextLineInputFormat]({{< ref "docs/connectors/datastream/formats/text_files" >}}) for examples).
-For PyFlink users, `CsvBulkWriter` could be used to create `BulkWriterFactory` to write `Row` records to files in CSV format.
+For PyFlink users, `CsvBulkWriters` could be used to create `BulkWriterFactory` to write `Row` records to files in CSV format.
It should be noted that if the preceding operator of sink is an operator which produces `RowData` records, e.g. CSV source, it needs to be converted to `Row` records before writing to sink.
```python
schema = CsvSchema.builder()
@@ -148,7 +148,7 @@ schema = CsvSchema.builder()
.build()
sink = FileSink.for_bulk_format(
- OUTPUT_DIR, CsvBulkWriter.for_schema(schema)).build()
+ OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
# If ds is a source stream producing RowData records, a map could be added to help converting RowData records into Row records.
ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
index 5eefc0376b2..d87e1f4ee2e 100644
--- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -154,10 +154,10 @@ will be `Types.PICKLED_BYTE_ARRAY()`.
You can also create a `DataStream` using DataStream connectors with method `add_source` as following:
```python
-from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
+from pyflink.datastream.format.json import JsonRowDeserializationSchema
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
@@ -305,7 +305,7 @@ You can call the `add_sink` method to emit the data of a `DataStream` to a DataS
```python
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
-from pyflink.common.serialization import JsonRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowSerializationSchema
serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md
index 41eb054015e..e5e39646a43 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -569,7 +569,7 @@ data_stream = ...
avro_type_info = GenericRecordAvroTypeInfo(schema)
sink = FileSink \
- .for_bulk_format(OUTPUT_BASE_PATH, AvroWriters.for_generic_record(schema)) \
+ .for_bulk_format(OUTPUT_BASE_PATH, AvroBulkWriters.for_generic_record(schema)) \
.build()
# A map to indicate its Avro type info is necessary for serialization
diff --git a/docs/content/docs/connectors/datastream/formats/csv.md b/docs/content/docs/connectors/datastream/formats/csv.md
index 3b9a374635d..60cacc9d5c8 100644
--- a/docs/content/docs/connectors/datastream/formats/csv.md
+++ b/docs/content/docs/connectors/datastream/formats/csv.md
@@ -138,7 +138,7 @@ The corresponding CSV file:
Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both continues and batch modes (see [TextLineInputFormat]({{< ref "docs/connectors/datastream/formats/text_files" >}}) for examples).
-For PyFlink users, `CsvBulkWriter` could be used to create `BulkWriterFactory` to write `Row` records to files in CSV format.
+For PyFlink users, `CsvBulkWriters` could be used to create `BulkWriterFactory` to write `Row` records to files in CSV format.
It should be noted that if the preceding operator of sink is an operator which produces `RowData` records, e.g. CSV source, it needs to be converted to `Row` records before writing to sink.
```python
schema = CsvSchema.builder() \
@@ -148,7 +148,7 @@ schema = CsvSchema.builder() \
.build()
sink = FileSink.for_bulk_format(
- OUTPUT_DIR, CsvBulkWriter.for_schema(schema)).build()
+ OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
# If ds is a source stream producing RowData records, a map could be added to help converting RowData records into Row records.
ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
index daedb0f418b..61b0d56b451 100644
--- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -154,10 +154,10 @@ will be `Types.PICKLED_BYTE_ARRAY()`.
You can also create a `DataStream` using DataStream connectors with method `add_source` as following:
```python
-from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
@@ -305,7 +305,7 @@ You can call the `add_sink` method to emit the data of a `DataStream` to a DataS
```python
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
-from pyflink.common.serialization import JsonRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowSerializationSchema
serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
index 6bb27d0d9cd..880c71e3145 100644
--- a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
+++ b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
@@ -19,11 +19,12 @@
from typing import Any
from pyflink.common import Duration
-from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
+from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.functions import KeyedProcessFunction
from functions import MyKeySelector
diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py
index f120c406b43..19f76febc7d 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -39,10 +39,14 @@ Common classes used by both Flink DataStream API and Table API:
- :class:`SerializationSchema`:
Base class to describes how to turn a data object into a different serialized representation.
Most data sinks (for example Apache Kafka) require the data to be handed to them in a specific
- format (for example as byte strings). See :class:`JsonRowSerializationSchema`,
- :class:`JsonRowDeserializationSchema`, :class:`CsvRowSerializationSchema`,
- :class:`CsvRowDeserializationSchema`, :class:`AvroRowSerializationSchema`,
- :class:`AvroRowDeserializationSchema` and :class:`SimpleStringSchema` for more details.
+ format (for example as byte strings). See
+ :class:`~pyflink.datastream.formats.json.JsonRowSerializationSchema`,
+ :class:`~pyflink.datastream.formats.json.JsonRowDeserializationSchema`,
+ :class:`~pyflink.datastream.formats.csv.CsvRowSerializationSchema`,
+ :class:`~pyflink.datastream.formats.csv.CsvRowDeserializationSchema`,
+ :class:`~pyflink.datastream.formats.avro.AvroRowSerializationSchema`,
+ :class:`~pyflink.datastream.formats.avro.AvroRowDeserializationSchema` and
+ :class:`~SimpleStringSchema` for more details.
"""
from pyflink.common.completable_future import CompletableFuture
from pyflink.common.config_options import ConfigOption, ConfigOptions
@@ -56,9 +60,7 @@ from pyflink.common.job_id import JobID
from pyflink.common.job_status import JobStatus
from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
from pyflink.common.serialization import SerializationSchema, DeserializationSchema, \
- SimpleStringSchema, JsonRowSerializationSchema, JsonRowDeserializationSchema, \
- CsvRowSerializationSchema, CsvRowDeserializationSchema, AvroRowSerializationSchema, \
- AvroRowDeserializationSchema, Encoder
+ SimpleStringSchema, Encoder
from pyflink.common.serializer import TypeSerializer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.common.types import Row, RowKind
@@ -77,12 +79,6 @@ __all__ = [
'SerializationSchema',
'DeserializationSchema',
'SimpleStringSchema',
- 'JsonRowSerializationSchema',
- 'JsonRowDeserializationSchema',
- 'CsvRowSerializationSchema',
- 'CsvRowDeserializationSchema',
- 'AvroRowSerializationSchema',
- 'AvroRowDeserializationSchema',
'Encoder',
'CompletableFuture',
'ExecutionMode',
@@ -100,5 +96,31 @@ __all__ = [
"Instant",
"Time",
"AssignerWithPeriodicWatermarksWrapper"
- ""
]
+
+
+def _install():
+ from pyflink import common
+
+ # json
+ from pyflink.datastream.formats.json import JsonRowDeserializationSchema
+ from pyflink.datastream.formats.json import JsonRowSerializationSchema
+ setattr(common, 'JsonRowDeserializationSchema', JsonRowDeserializationSchema)
+ setattr(common, 'JsonRowSerializationSchema', JsonRowSerializationSchema)
+
+ # csv
+ from pyflink.datastream.formats.csv import CsvRowDeserializationSchema
+ from pyflink.datastream.formats.csv import CsvRowSerializationSchema
+ setattr(common, 'CsvRowDeserializationSchema', CsvRowDeserializationSchema)
+ setattr(common, 'CsvRowSerializationSchema', CsvRowSerializationSchema)
+
+ # avro
+ from pyflink.datastream.formats.avro import AvroRowDeserializationSchema
+ from pyflink.datastream.formats.avro import AvroRowSerializationSchema
+ setattr(common, 'AvroRowDeserializationSchema', AvroRowDeserializationSchema)
+ setattr(common, 'AvroRowSerializationSchema', AvroRowSerializationSchema)
+
+
+# for backward compatibility
+_install()
+del _install
diff --git a/flink-python/pyflink/common/io.py b/flink-python/pyflink/common/io.py
new file mode 100644
index 00000000000..97cf803685c
--- /dev/null
+++ b/flink-python/pyflink/common/io.py
@@ -0,0 +1,32 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.utils import JavaObjectWrapper
+
+__all__ = [
+ 'InputFormat'
+]
+
+
+class InputFormat(JavaObjectWrapper):
+ """
+ The Python wrapper of Java InputFormat interface, which is the base interface for data sources
+ that produce records.
+ """
+
+ def __init__(self, j_input_format):
+ super().__init__(j_input_format)
diff --git a/flink-python/pyflink/common/serialization.py b/flink-python/pyflink/common/serialization.py
index e70bb8ffd3a..23613e25cc7 100644
--- a/flink-python/pyflink/common/serialization.py
+++ b/flink-python/pyflink/common/serialization.py
@@ -15,18 +15,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from py4j.java_gateway import java_import
-from pyflink.common import typeinfo
-from pyflink.common.typeinfo import TypeInformation
-
-from pyflink.util.java_utils import load_java_class
+from pyflink.common.utils import JavaObjectWrapper
from pyflink.java_gateway import get_gateway
-__all__ = ['SerializationSchema', 'DeserializationSchema', 'SimpleStringSchema',
- 'JsonRowSerializationSchema', 'JsonRowDeserializationSchema',
- 'CsvRowSerializationSchema', 'CsvRowDeserializationSchema',
- 'AvroRowSerializationSchema', 'AvroRowDeserializationSchema', 'Encoder']
+__all__ = [
+ 'SerializationSchema',
+ 'DeserializationSchema',
+ 'SimpleStringSchema',
+ 'Encoder',
+ 'BulkWriterFactory'
+]
class SerializationSchema(object):
@@ -69,310 +68,6 @@ class SimpleStringSchema(SerializationSchema, DeserializationSchema):
self, j_deserialization_schema=j_simple_string_serialization_schema)
-class JsonRowDeserializationSchema(DeserializationSchema):
- """
- Deserialization schema from JSON to Flink types.
-
- Deserializes a byte[] message as a JSON object and reads the specified fields.
-
- Failures during deserialization are forwarded as wrapped IOExceptions.
- """
- def __init__(self, j_deserialization_schema):
- super(JsonRowDeserializationSchema, self).__init__(j_deserialization_schema)
-
- @staticmethod
- def builder():
- """
- A static method to get a Builder for JsonRowDeserializationSchema.
- """
- return JsonRowDeserializationSchema.Builder()
-
- class Builder(object):
- """
- Builder for JsonRowDeserializationSchema.
- """
-
- def __init__(self):
- self._type_info = None
- self._fail_on_missing_field = False
- self._ignore_parse_errors = False
-
- def type_info(self, type_info: TypeInformation):
- """
- Creates a JSON deserialization schema for the given type information.
-
- :param type_info: Type information describing the result type. The field names of Row
- are used to parse the JSON properties.
- """
- self._type_info = type_info
- return self
-
- def json_schema(self, json_schema: str):
- """
- Creates a JSON deserialization schema for the given JSON schema.
-
- :param json_schema: JSON schema describing the result type.
- """
- if json_schema is None:
- raise TypeError("The json_schema must not be None.")
- j_type_info = get_gateway().jvm \
- .org.apache.flink.formats.json.JsonRowSchemaConverter.convert(json_schema)
- self._type_info = typeinfo._from_java_type(j_type_info)
- return self
-
- def fail_on_missing_field(self):
- """
- Configures schema to fail if a JSON field is missing. A missing field is ignored and the
- field is set to null by default.
- """
- self._fail_on_missing_field = True
- return self
-
- def ignore_parse_errors(self):
- """
- Configures schema to fail when parsing json failed. An exception will be thrown when
- parsing json fails.
- """
- self._ignore_parse_errors = True
- return self
-
- def build(self):
- JBuilder = get_gateway().jvm.org.apache.flink.formats.json.JsonRowDeserializationSchema\
- .Builder
- j_builder = JBuilder(self._type_info.get_java_type_info())
-
- if self._fail_on_missing_field:
- j_builder = j_builder.fialOnMissingField()
-
- if self._ignore_parse_errors:
- j_builder = j_builder.ignoreParseErrors()
-
- j_deserialization_schema = j_builder.build()
- return JsonRowDeserializationSchema(j_deserialization_schema=j_deserialization_schema)
-
-
-class JsonRowSerializationSchema(SerializationSchema):
- """
- Serialization schema that serializes an object of Flink types into a JSON bytes. Serializes the
- input Flink object into a JSON string and converts it into byte[].
-
- Result byte[] message can be deserialized using JsonRowDeserializationSchema.
- """
-
- def __init__(self, j_serialization_schema):
- super(JsonRowSerializationSchema, self).__init__(j_serialization_schema)
-
- @staticmethod
- def builder():
- return JsonRowSerializationSchema.Builder()
-
- class Builder(object):
- """
- Builder for JsonRowSerializationSchema.
- """
- def __init__(self):
- self._type_info = None
-
- def with_type_info(self, type_info: TypeInformation):
- """
- Creates a JSON serialization schema for the given type information.
-
- :param type_info: Type information describing the result type. The field names of Row
- are used to parse the JSON properties.
- """
- self._type_info = type_info
- return self
-
- def build(self):
- if self._type_info is None:
- raise TypeError("Typeinfo should be set.")
-
- j_builder = get_gateway().jvm \
- .org.apache.flink.formats.json.JsonRowSerializationSchema.builder()
-
- j_schema = j_builder.withTypeInfo(self._type_info.get_java_type_info()).build()
- return JsonRowSerializationSchema(j_serialization_schema=j_schema)
-
-
-class CsvRowDeserializationSchema(DeserializationSchema):
- """
- Deserialization schema from CSV to Flink types. Deserializes a byte[] message as a JsonNode and
- converts it to Row.
-
- Failure during deserialization are forwarded as wrapped IOException.
- """
-
- def __init__(self, j_deserialization_schema):
- super(CsvRowDeserializationSchema, self).__init__(
- j_deserialization_schema=j_deserialization_schema)
-
- class Builder(object):
- """
- A builder for creating a CsvRowDeserializationSchema.
- """
- def __init__(self, type_info: TypeInformation):
- if type_info is None:
- raise TypeError("Type information must not be None")
- self._j_builder = get_gateway().jvm\
- .org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder(
- type_info.get_java_type_info())
-
- def set_field_delimiter(self, delimiter: str):
- self._j_builder = self._j_builder.setFieldDelimiter(delimiter)
- return self
-
- def set_allow_comments(self, allow_comments: bool):
- self._j_builder = self._j_builder.setAllowComments(allow_comments)
- return self
-
- def set_array_element_delimiter(self, delimiter: str):
- self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
- return self
-
- def set_quote_character(self, c: str):
- self._j_builder = self._j_builder.setQuoteCharacter(c)
- return self
-
- def set_escape_character(self, c: str):
- self._j_builder = self._j_builder.setEscapeCharacter(c)
- return self
-
- def set_null_literal(self, null_literal: str):
- self._j_builder = self._j_builder.setNullLiteral(null_literal)
- return self
-
- def set_ignore_parse_errors(self, ignore_parse_errors: bool):
- self._j_builder = self._j_builder.setIgnoreParseErrors(ignore_parse_errors)
- return self
-
- def build(self):
- j_csv_row_deserialization_schema = self._j_builder.build()
- return CsvRowDeserializationSchema(
- j_deserialization_schema=j_csv_row_deserialization_schema)
-
-
-class CsvRowSerializationSchema(SerializationSchema):
- """
- Serialization schema that serializes an object of Flink types into a CSV bytes. Serializes the
- input row into an ObjectNode and converts it into byte[].
-
- Result byte[] messages can be deserialized using CsvRowDeserializationSchema.
- """
- def __init__(self, j_csv_row_serialization_schema):
- super(CsvRowSerializationSchema, self).__init__(j_csv_row_serialization_schema)
-
- class Builder(object):
- """
- A builder for creating a CsvRowSerializationSchema.
- """
- def __init__(self, type_info: TypeInformation):
- if type_info is None:
- raise TypeError("Type information must not be None")
- self._j_builder = get_gateway().jvm\
- .org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder(
- type_info.get_java_type_info())
-
- def set_field_delimiter(self, c: str):
- self._j_builder = self._j_builder.setFieldDelimiter(c)
- return self
-
- def set_line_delimiter(self, delimiter: str):
- self._j_builder = self._j_builder.setLineDelimiter(delimiter)
- return self
-
- def set_array_element_delimiter(self, delimiter: str):
- self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
- return self
-
- def disable_quote_character(self):
- self._j_builder = self._j_builder.disableQuoteCharacter()
- return self
-
- def set_quote_character(self, c: str):
- self._j_builder = self._j_builder.setQuoteCharacter(c)
- return self
-
- def set_escape_character(self, c: str):
- self._j_builder = self._j_builder.setEscapeCharacter(c)
- return self
-
- def set_null_literal(self, s: str):
- self._j_builder = self._j_builder.setNullLiteral(s)
- return self
-
- def build(self):
- j_serialization_schema = self._j_builder.build()
- return CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema)
-
-
-class AvroRowDeserializationSchema(DeserializationSchema):
- """
- Deserialization schema from Avro bytes to Row. Deserializes the byte[] messages into (nested)
- Flink rows. It converts Avro types into types that are compatible with Flink's Table & SQL API.
-
- Projects with Avro records containing logical date/time types need to add a JodaTime dependency.
- """
- def __init__(self, record_class: str = None, avro_schema_string: str = None):
- """
- Creates an Avro deserialization schema for the given specific record class or Avro schema
- string. Having the concrete Avro record class might improve performance.
-
- :param record_class: Avro record class used to deserialize Avro's record to Flink's row.
- :param avro_schema_string: Avro schema string to deserialize Avro's record to Flink's row.
- """
-
- if avro_schema_string is None and record_class is None:
- raise TypeError("record_class or avro_schema_string should be specified.")
- j_deserialization_schema = None
- if record_class is not None:
- gateway = get_gateway()
- java_import(gateway.jvm, record_class)
- j_record_class = load_java_class(record_class)
- JAvroRowDeserializationSchema = get_gateway().jvm \
- .org.apache.flink.formats.avro.AvroRowDeserializationSchema
- j_deserialization_schema = JAvroRowDeserializationSchema(j_record_class)
-
- elif avro_schema_string is not None:
- JAvroRowDeserializationSchema = get_gateway().jvm \
- .org.apache.flink.formats.avro.AvroRowDeserializationSchema
- j_deserialization_schema = JAvroRowDeserializationSchema(avro_schema_string)
-
- super(AvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
-
-
-class AvroRowSerializationSchema(SerializationSchema):
- """
- Serialization schema that serializes to Avro binary format.
- """
-
- def __init__(self, record_class: str = None, avro_schema_string: str = None):
- """
- Creates AvroSerializationSchema that serializes SpecificRecord using provided schema or
- record class.
-
- :param record_class: Avro record class used to serialize Flink's row to Avro's record.
- :param avro_schema_string: Avro schema string to serialize Flink's row to Avro's record.
- """
- if avro_schema_string is None and record_class is None:
- raise TypeError("record_class or avro_schema_string should be specified.")
-
- j_serialization_schema = None
- if record_class is not None:
- gateway = get_gateway()
- java_import(gateway.jvm, record_class)
- j_record_class = load_java_class(record_class)
- JAvroRowSerializationSchema = get_gateway().jvm \
- .org.apache.flink.formats.avro.AvroRowSerializationSchema
- j_serialization_schema = JAvroRowSerializationSchema(j_record_class)
-
- elif avro_schema_string is not None:
- JAvroRowSerializationSchema = get_gateway().jvm \
- .org.apache.flink.formats.avro.AvroRowSerializationSchema
- j_serialization_schema = JAvroRowSerializationSchema(avro_schema_string)
-
- super(AvroRowSerializationSchema, self).__init__(j_serialization_schema)
-
-
class Encoder(object):
"""
Encoder is used by the file sink to perform the actual writing of the
@@ -391,3 +86,27 @@ class Encoder(object):
j_encoder = get_gateway().jvm.org.apache.flink.api.common.serialization.\
SimpleStringEncoder(charset_name)
return Encoder(j_encoder)
+
+
+class BulkWriterFactory(JavaObjectWrapper):
+ """
+ The Python wrapper of Java BulkWriter.Factory interface, which is the base interface for data
+ sinks that write records into files in a bulk manner.
+ """
+
+ def __init__(self, j_bulk_writer_factory):
+ super().__init__(j_bulk_writer_factory)
+
+
+class RowDataBulkWriterFactory(BulkWriterFactory):
+ """
+ A :class:`~BulkWriterFactory` that receives records with RowData type. This is for indicating
+ that Row record from Python must be first converted to RowData.
+ """
+
+ def __init__(self, j_bulk_writer_factory, row_type):
+ super().__init__(j_bulk_writer_factory)
+ self._row_type = row_type
+
+ def get_row_type(self):
+ return self._row_type
diff --git a/flink-python/pyflink/common/tests/test_serialization_schemas.py b/flink-python/pyflink/common/tests/test_serialization_schemas.py
index 9544bd13dba..702241a6039 100644
--- a/flink-python/pyflink/common/tests/test_serialization_schemas.py
+++ b/flink-python/pyflink/common/tests/test_serialization_schemas.py
@@ -15,15 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pyflink.common.serialization import JsonRowSerializationSchema, \
- JsonRowDeserializationSchema, CsvRowSerializationSchema, CsvRowDeserializationSchema, \
- SimpleStringSchema
-from pyflink.common.typeinfo import Types
-from pyflink.java_gateway import get_gateway
+from pyflink.common.serialization import SimpleStringSchema
from pyflink.testing.test_case_utils import PyFlinkTestCase
-class TestRowSerializationSchemas(PyFlinkTestCase):
+class SimpleStringSchemaTests(PyFlinkTestCase):
def test_simple_string_schema(self):
expected_string = 'test string'
@@ -33,81 +29,3 @@ class TestRowSerializationSchemas(PyFlinkTestCase):
self.assertEqual(expected_string, simple_string_schema._j_deserialization_schema
.deserialize(expected_string.encode(encoding='utf-8')))
-
- def test_json_row_serialization_deserialization_schema(self):
- jvm = get_gateway().jvm
- jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\"}",
- "{\"svt\":\"2020-02-24T12:58:09.209+0800\", "
- "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},\"ids\":[1, 2, 3]}",
- "{\"svt\":\"2020-02-24T12:58:09.209+0800\"}"]
- expected_jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}",
- "{\"svt\":\"2020-02-24T12:58:09.209+0800\","
- "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},"
- "\"ids\":[1,2,3]}",
- "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}"]
-
- row_schema = Types.ROW_NAMED(["svt", "ops", "ids"],
- [Types.STRING(),
- Types.ROW_NAMED(['id'], [Types.STRING()]),
- Types.PRIMITIVE_ARRAY(Types.INT())])
-
- json_row_serialization_schema = JsonRowSerializationSchema.builder() \
- .with_type_info(row_schema).build()
- json_row_deserialization_schema = JsonRowDeserializationSchema.builder() \
- .type_info(row_schema).build()
- json_row_serialization_schema._j_serialization_schema.open(
- jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
- json_row_deserialization_schema._j_deserialization_schema.open(
- jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
-
- for i in range(len(jsons)):
- j_row = json_row_deserialization_schema._j_deserialization_schema\
- .deserialize(bytes(jsons[i], encoding='utf-8'))
- result = str(json_row_serialization_schema._j_serialization_schema.serialize(j_row),
- encoding='utf-8')
- self.assertEqual(expected_jsons[i], result)
-
- def test_csv_row_serialization_schema(self):
- jvm = get_gateway().jvm
- JRow = jvm.org.apache.flink.types.Row
-
- j_row = JRow(3)
- j_row.setField(0, "BEGIN")
- j_row.setField(2, "END")
-
- def field_assertion(field_info, csv_value, value, field_delimiter):
- row_info = Types.ROW([Types.STRING(), field_info, Types.STRING()])
- expected_csv = "BEGIN" + field_delimiter + csv_value + field_delimiter + "END\n"
- j_row.setField(1, value)
-
- csv_row_serialization_schema = CsvRowSerializationSchema.Builder(row_info)\
- .set_escape_character('*').set_quote_character('\'')\
- .set_array_element_delimiter(':').set_field_delimiter(';').build()
- csv_row_deserialization_schema = CsvRowDeserializationSchema.Builder(row_info)\
- .set_escape_character('*').set_quote_character('\'')\
- .set_array_element_delimiter(':').set_field_delimiter(';').build()
- csv_row_serialization_schema._j_serialization_schema.open(
- jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
- csv_row_deserialization_schema._j_deserialization_schema.open(
- jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
-
- serialized_bytes = csv_row_serialization_schema._j_serialization_schema.serialize(j_row)
- self.assertEqual(expected_csv, str(serialized_bytes, encoding='utf-8'))
-
- j_deserialized_row = csv_row_deserialization_schema._j_deserialization_schema\
- .deserialize(expected_csv.encode("utf-8"))
- self.assertTrue(j_row.equals(j_deserialized_row))
-
- field_assertion(Types.STRING(), "'123''4**'", "123'4*", ";")
- field_assertion(Types.STRING(), "'a;b''c'", "a;b'c", ";")
- field_assertion(Types.INT(), "12", 12, ";")
-
- test_j_row = JRow(2)
- test_j_row.setField(0, "1")
- test_j_row.setField(1, "hello")
-
- field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello'", test_j_row, ";")
- test_j_row.setField(1, "hello world")
- field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello world'", test_j_row,
- ";")
- field_assertion(Types.STRING(), "null", "null", ";")
diff --git a/flink-python/pyflink/common/utils.py b/flink-python/pyflink/common/utils.py
new file mode 100644
index 00000000000..05e047b64a8
--- /dev/null
+++ b/flink-python/pyflink/common/utils.py
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+class JavaObjectWrapper(object):
+
+ def __init__(self, j_object):
+ self._j_object = j_object
+
+ def get_java_object(self):
+ return self._j_object
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 436e8677666..b9ce763bd53 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -215,21 +215,22 @@ Classes to define formats used together with source & sink:
- :class:`formats.csv.CsvReaderFormat`:
A :class:`~connectors.file_system.StreamFormat` to read CSV files into Row data.
- :class:`formats.csv.CsvBulkWriter`:
- Creates :class:`~connectors.file_system.BulkWriterFactory` to write Row data into CSV files.
+ Creates :class:`~pyflink.common.serialization.BulkWriterFactory` to write Row data into CSV
+ files.
- :class:`formats.avro.GenericRecordAvroTypeInfo`:
A :class:`~pyflink.common.typeinfo.TypeInformation` to indicate vanilla Python records will be
translated to GenericRecordAvroTypeInfo on the Java side.
- :class:`formats.avro.AvroInputFormat`:
- A :class:`~connector.file_system.InputFormat` to read avro files in a streaming fashion.
+ An InputFormat to read avro files in a streaming fashion.
- :class:`formats.avro.AvroWriters`:
- A class to provide :class:`~connector.file_system.BulkWriterFactory` to write vanilla Python
- objects into avro files in a batch fashion.
+ A class to provide :class:`~pyflink.common.serialization.BulkWriterFactory` to write vanilla
+ Python objects into avro files in a batch fashion.
- :class:`formats.parquet.ParquetColumnarRowInputFormat`:
A :class:`~connectors.file_system.BulkFormat` to read columnar parquet files into Row data in
a batch-processing fashion.
- :class:`formats.parquet.ParquetBulkWriter`:
- Convenient builder to create a :class:`~connectors.file_system.BulkWriterFactory` that writes
- Rows with a defined RowType into Parquet files in a batch fashion.
+ Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
+ writes Rows with a defined RowType into Parquet files in a batch fashion.
- :class:`formats.parquet.AvroParquetReaders`:
A convenience builder to create reader format that reads individual Avro records from a
Parquet stream. Only GenericRecord is supported in PyFlink.
@@ -237,8 +238,8 @@ Classes to define formats used together with source & sink:
Convenience builder to create ParquetWriterFactory instances for Avro types. Only
GenericRecord is supported in PyFlink.
- :class:`formats.orc.OrcBulkWriters`:
- Convenient builder to create a :class:`BulkWriterFactory` that writes Row records with a
- defined :class:`RowType` into Orc files.
+ Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
+ writes Row records with a defined :class:`RowType` into Orc files.
Other important classes:
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py b/flink-python/pyflink/datastream/connectors/file_system.py
index 1a147495a1d..94fb040d3a8 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -20,6 +20,8 @@ from abc import abstractmethod
from typing import TYPE_CHECKING, Optional
+from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory
+
if TYPE_CHECKING:
from pyflink.table.types import RowType
@@ -27,7 +29,7 @@ from pyflink.common import Duration, Encoder
from pyflink.datastream.connectors import Source, Sink
from pyflink.datastream.connectors.base import SupportsPreprocessing, StreamTransformer
from pyflink.datastream.functions import SinkFunction
-from pyflink.datastream.utils import JavaObjectWrapper
+from pyflink.common.utils import JavaObjectWrapper
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray
@@ -39,10 +41,8 @@ __all__ = [
'FileSourceBuilder',
'FileSink',
'StreamingFileSink',
- 'BulkFormat',
'StreamFormat',
- 'InputFormat',
- 'BulkWriterFactory',
+ 'BulkFormat',
'FileEnumeratorProvider',
'FileSplitAssignerProvider',
'RollingPolicy',
@@ -165,40 +165,6 @@ class BulkFormat(object):
self._j_bulk_format = j_bulk_format
-class InputFormat(JavaObjectWrapper):
- """
- The Python wrapper of Java InputFormat interface, which is the base interface for data sources
- that produce records.
- """
-
- def __init__(self, j_input_format):
- super().__init__(j_input_format)
-
-
-class BulkWriterFactory(JavaObjectWrapper):
- """
- The Python wrapper of Java BulkWriter.Factory interface, which is the base interface for data
- sinks that write records into files in a bulk manner.
- """
-
- def __init__(self, j_bulk_writer_factory):
- super().__init__(j_bulk_writer_factory)
-
-
-class RowDataBulkWriterFactory(BulkWriterFactory):
- """
- A :class:`BulkWriterFactory` that receives records with RowData type. This is for indicating
- that Row record from Python must be first converted to RowData.
- """
-
- def __init__(self, j_bulk_writer_factory, row_type: 'RowType'):
- super().__init__(j_bulk_writer_factory)
- self._row_type = row_type
-
- def get_row_type(self) -> 'RowType':
- return self._row_type
-
-
class FileSourceBuilder(object):
"""
The builder for the :class:`~FileSource`, to configure the various behaviors.
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
index b36e22092bf..cf9990af354 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
@@ -22,9 +22,7 @@ import pyflink.datastream.data_stream as data_stream
from pyflink.common import typeinfo
from pyflink.common.configuration import Configuration
-from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema, \
- JsonRowDeserializationSchema, CsvRowDeserializationSchema, AvroRowDeserializationSchema, \
- JsonRowSerializationSchema, CsvRowSerializationSchema, AvroRowSerializationSchema
+from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.common.types import Row
from pyflink.common.watermark_strategy import WatermarkStrategy
@@ -32,6 +30,9 @@ from pyflink.datastream.connectors.base import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \
KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink, \
FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroRowSerializationSchema
+from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, CsvRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import (
PyFlinkStreamingTestCase,
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_rabbitmq.py b/flink-python/pyflink/datastream/connectors/tests/test_rabbitmq.py
index de2fca63076..b49e10d7ddd 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_rabbitmq.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_rabbitmq.py
@@ -15,8 +15,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pyflink.common import Types, JsonRowDeserializationSchema, JsonRowSerializationSchema
+from pyflink.common import Types
from pyflink.datastream.connectors.rabbitmq import RMQSink, RMQSource, RMQConnectionConfig
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
from pyflink.util.java_utils import get_field_value
diff --git a/flink-python/pyflink/datastream/formats/avro.py b/flink-python/pyflink/datastream/formats/avro.py
index 66a16eb9f7d..c60902d8c5e 100644
--- a/flink-python/pyflink/datastream/formats/avro.py
+++ b/flink-python/pyflink/datastream/formats/avro.py
@@ -15,20 +15,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from py4j.java_gateway import get_java_class, JavaObject
+from py4j.java_gateway import get_java_class, JavaObject, java_import
+
+from pyflink.common.io import InputFormat
+from pyflink.common.serialization import BulkWriterFactory, SerializationSchema, \
+ DeserializationSchema
from pyflink.common.typeinfo import TypeInformation
-from pyflink.datastream.connectors.file_system import InputFormat, BulkWriterFactory
from pyflink.datastream.utils import ResultTypeQueryable
from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import get_field_value
-
+from pyflink.util.java_utils import get_field_value, load_java_class
__all__ = [
- 'AvroInputFormat',
'AvroSchema',
- 'AvroWriters',
- 'GenericRecordAvroTypeInfo'
+ 'GenericRecordAvroTypeInfo',
+ 'AvroInputFormat',
+ 'AvroBulkWriters',
+ 'AvroRowDeserializationSchema',
+ 'AvroRowSerializationSchema'
]
@@ -85,7 +89,7 @@ class GenericRecordAvroTypeInfo(TypeInformation):
"""
def __init__(self, schema: 'AvroSchema'):
- super().__init__()
+ super(GenericRecordAvroTypeInfo, self).__init__()
self._schema = schema
self._j_typeinfo = get_gateway().jvm.org.apache.flink.formats.avro.typeutils \
.GenericRecordAvroTypeInfo(schema._j_schema)
@@ -118,16 +122,17 @@ class AvroInputFormat(InputFormat, ResultTypeQueryable):
jvm.org.apache.flink.core.fs.Path(path),
get_java_class(jvm.org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord)
)
- super().__init__(j_avro_input_format)
+ super(AvroInputFormat, self).__init__(j_avro_input_format)
self._type_info = GenericRecordAvroTypeInfo(schema)
def get_produced_type(self) -> GenericRecordAvroTypeInfo:
return self._type_info
-class AvroWriters(object):
+class AvroBulkWriters(object):
"""
- Convenience builder to create AvroWriterFactory instances.
+ Convenience builder to create :class:`~pyflink.common.serialization.BulkWriterFactory` for
+ Avro types.
.. versionadded:: 1.16.0
"""
@@ -139,7 +144,7 @@ class AvroWriters(object):
will use the given schema to build and write the records.
Note that to make this works in PyFlink, you need to declare the output type of the
- predecessor before FileSink to be :class:`GenericRecordAvroTypeInfo`, and the predecessor
+ predecessor before FileSink to be :class:`~GenericRecordAvroTypeInfo`, and the predecessor
cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can add a pass-through map
function before the sink, as the example shown below.
@@ -155,7 +160,7 @@ class AvroWriters(object):
>>> avro_type_info = GenericRecordAvroTypeInfo(schema)
>>> ds = env.from_collection([{'array': [1, 2]}], type_info=Types.PICKLED_BYTE_ARRAY())
>>> sink = FileSink.for_bulk_format(
- ... OUTPUT_DIR, AvroWriters.for_generic_record(schema)).build()
+ ... OUTPUT_DIR, AvroBulkWriters.for_generic_record(schema)).build()
>>> # A map to indicate its Avro type info is necessary for serialization
>>> ds.map(lambda e: e, output_type=GenericRecordAvroTypeInfo(schema)) \\
... .sink_to(sink)
@@ -168,3 +173,71 @@ class AvroWriters(object):
schema._j_schema
)
return BulkWriterFactory(j_bulk_writer_factory)
+
+
+class AvroRowDeserializationSchema(DeserializationSchema):
+ """
+ Deserialization schema from Avro bytes to Row. Deserializes the byte[] messages into (nested)
+ Flink rows. It converts Avro types into types that are compatible with Flink's Table & SQL API.
+
+ Projects with Avro records containing logical date/time types need to add a JodaTime dependency.
+ """
+ def __init__(self, record_class: str = None, avro_schema_string: str = None):
+ """
+ Creates an Avro deserialization schema for the given specific record class or Avro schema
+ string. Having the concrete Avro record class might improve performance.
+
+ :param record_class: Avro record class used to deserialize Avro's record to Flink's row.
+ :param avro_schema_string: Avro schema string to deserialize Avro's record to Flink's row.
+ """
+
+ if avro_schema_string is None and record_class is None:
+ raise TypeError("record_class or avro_schema_string should be specified.")
+ j_deserialization_schema = None
+ if record_class is not None:
+ gateway = get_gateway()
+ java_import(gateway.jvm, record_class)
+ j_record_class = load_java_class(record_class)
+ JAvroRowDeserializationSchema = get_gateway().jvm \
+ .org.apache.flink.formats.avro.AvroRowDeserializationSchema
+ j_deserialization_schema = JAvroRowDeserializationSchema(j_record_class)
+
+ elif avro_schema_string is not None:
+ JAvroRowDeserializationSchema = get_gateway().jvm \
+ .org.apache.flink.formats.avro.AvroRowDeserializationSchema
+ j_deserialization_schema = JAvroRowDeserializationSchema(avro_schema_string)
+
+ super(AvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
+
+
+class AvroRowSerializationSchema(SerializationSchema):
+ """
+ Serialization schema that serializes to Avro binary format.
+ """
+
+ def __init__(self, record_class: str = None, avro_schema_string: str = None):
+ """
+ Creates AvroSerializationSchema that serializes SpecificRecord using provided schema or
+ record class.
+
+ :param record_class: Avro record class used to serialize Flink's row to Avro's record.
+ :param avro_schema_string: Avro schema string to serialize Flink's row to Avro's record.
+ """
+ if avro_schema_string is None and record_class is None:
+ raise TypeError("record_class or avro_schema_string should be specified.")
+
+ j_serialization_schema = None
+ if record_class is not None:
+ gateway = get_gateway()
+ java_import(gateway.jvm, record_class)
+ j_record_class = load_java_class(record_class)
+ JAvroRowSerializationSchema = get_gateway().jvm \
+ .org.apache.flink.formats.avro.AvroRowSerializationSchema
+ j_serialization_schema = JAvroRowSerializationSchema(j_record_class)
+
+ elif avro_schema_string is not None:
+ JAvroRowSerializationSchema = get_gateway().jvm \
+ .org.apache.flink.formats.avro.AvroRowSerializationSchema
+ j_serialization_schema = JAvroRowSerializationSchema(avro_schema_string)
+
+ super(AvroRowSerializationSchema, self).__init__(j_serialization_schema)
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
index a0d0f803440..377319493ac 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -15,19 +15,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
-from pyflink.common.typeinfo import _from_java_type
-from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory, \
- StreamFormat
+from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory, \
+ SerializationSchema, DeserializationSchema
+from pyflink.common.typeinfo import _from_java_type, TypeInformation
+from pyflink.datastream.connectors.file_system import StreamFormat
from pyflink.java_gateway import get_gateway
-from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
+
+if TYPE_CHECKING:
+ from pyflink.table.types import DataType, RowType, NumericType
__all__ = [
- 'CsvBulkWriter',
- 'CsvReaderFormat',
'CsvSchema',
- 'CsvSchemaBuilder'
+ 'CsvSchemaBuilder',
+ 'CsvReaderFormat',
+ 'CsvBulkWriters',
+ 'CsvRowDeserializationSchema',
+ 'CsvRowSerializationSchema'
]
@@ -39,9 +44,9 @@ class CsvSchema(object):
.. versionadded:: 1.16.0
"""
- def __init__(self, j_schema, row_type: RowType):
+ def __init__(self, j_schema, row_type: 'RowType'):
self._j_schema = j_schema
- self._row_type = row_type # type: RowType
+ self._row_type = row_type
self._type_info = None
@staticmethod
@@ -53,6 +58,7 @@ class CsvSchema(object):
def get_type_info(self):
if self._type_info is None:
+ from pyflink.table.types import _to_java_data_type
jvm = get_gateway().jvm
j_type_info = jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter \
.toLegacyTypeInfo(_to_java_data_type(self._row_type))
@@ -74,7 +80,7 @@ class CsvSchema(object):
class CsvSchemaBuilder(object):
"""
- CsvSchemaBuilder is for building a :class:`CsvSchemaBuilder`, corresponding to Java
+ CsvSchemaBuilder is for building a :class:`~CsvSchema`, corresponding to Java
``com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder`` class.
.. versionadded:: 1.16.0
@@ -88,14 +94,15 @@ class CsvSchemaBuilder(object):
def build(self) -> 'CsvSchema':
"""
- Build the :class:`CsvSchema`.
+ Build the :class:`~CsvSchema`.
"""
+ from pyflink.table.types import DataTypes
return CsvSchema(self._j_schema_builder.build(), DataTypes.ROW(self._fields))
def add_array_column(self,
name: str,
separator: str = ';',
- element_type: Optional[DataType] = DataTypes.STRING()) \
+ element_type: Optional['DataType'] = None) \
-> 'CsvSchemaBuilder':
"""
Add an array column to schema, the type of elements could be specified via ``element_type``,
@@ -105,6 +112,9 @@ class CsvSchemaBuilder(object):
:param separator: Text separator of array elements, default to ``;``.
:param element_type: DataType of array elements, default to ``DataTypes.STRING()``.
"""
+ from pyflink.table.types import DataTypes
+ if element_type is None:
+ element_type = DataTypes.STRING()
self._j_schema_builder.addArrayColumn(name, separator)
self._fields.append(DataTypes.FIELD(name, DataTypes.ARRAY(element_type)))
return self
@@ -115,12 +125,13 @@ class CsvSchemaBuilder(object):
:param name: Name of the column.
"""
+ from pyflink.table.types import DataTypes
self._j_schema_builder.addBooleanColumn(name)
self._fields.append(DataTypes.FIELD(name, DataTypes.BOOLEAN()))
return self
def add_number_column(self, name: str,
- number_type: Optional[NumericType] = DataTypes.BIGINT()) \
+ number_type: Optional['NumericType'] = None) \
-> 'CsvSchemaBuilder':
"""
Add a number column to schema, the type of number could be specified via ``number_type``.
@@ -128,6 +139,9 @@ class CsvSchemaBuilder(object):
:param name: Name of the column.
:param number_type: DataType of the number, default to ``DataTypes.BIGINT()``.
"""
+ from pyflink.table.types import DataTypes
+ if number_type is None:
+ number_type = DataTypes.BIGINT()
self._j_schema_builder.addNumberColumn(name)
self._fields.append(DataTypes.FIELD(name, number_type))
return self
@@ -138,6 +152,7 @@ class CsvSchemaBuilder(object):
:param name: Name of the column.
"""
+ from pyflink.table.types import DataTypes
self._j_schema_builder.addColumn(name)
self._fields.append(DataTypes.FIELD(name, DataTypes.STRING()))
return self
@@ -279,7 +294,7 @@ class CsvSchemaBuilder(object):
class CsvReaderFormat(StreamFormat):
"""
- The :class:`StreamFormat` for reading csv files.
+ The :class:`~StreamFormat` for reading csv files.
Example:
::
@@ -310,6 +325,7 @@ class CsvReaderFormat(StreamFormat):
"""
Builds a :class:`CsvReaderFormat` using `CsvSchema`.
"""
+ from pyflink.table.types import _to_java_data_type
jvm = get_gateway().jvm
j_csv_format = jvm.org.apache.flink.formats.csv.PythonCsvUtils \
.createCsvReaderFormat(
@@ -319,10 +335,10 @@ class CsvReaderFormat(StreamFormat):
return CsvReaderFormat(j_csv_format)
-class CsvBulkWriter(object):
+class CsvBulkWriters(object):
"""
- CsvBulkWriter is for building :class:`BulkWriterFactory` to write Rows with a predefined CSV
- schema to partitioned files in a bulk fashion.
+ CsvBulkWriter is for building :class:`~pyflink.common.serialization.BulkWriterFactory` to write
+ Rows with a predefined CSV schema to partitioned files in a bulk fashion.
Example:
::
@@ -334,7 +350,7 @@ class CsvBulkWriter(object):
... .set_column_separator('|') \\
... .build()
>>> sink = FileSink.for_bulk_format(
- ... OUTPUT_DIR, CsvBulkWriter.for_schema(schema)).build()
+ ... OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
>>> # If ds is a source stream, an identity map before sink is required
>>> ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
@@ -344,8 +360,11 @@ class CsvBulkWriter(object):
@staticmethod
def for_schema(schema: 'CsvSchema') -> 'BulkWriterFactory':
"""
- Builds a :class:`BulkWriterFactory` for writing records to files in CSV format.
+ Creates a :class:`~pyflink.common.serialization.BulkWriterFactory` for writing records to
+ files in CSV format.
"""
+ from pyflink.table.types import _to_java_data_type
+
jvm = get_gateway().jvm
csv = jvm.org.apache.flink.formats.csv
@@ -353,3 +372,114 @@ class CsvBulkWriter(object):
schema._j_schema,
_to_java_data_type(schema._row_type))
return RowDataBulkWriterFactory(j_factory, schema._row_type)
+
+
+class CsvRowDeserializationSchema(DeserializationSchema):
+ """
+ Deserialization schema from CSV to Flink types. Deserializes a byte[] message as a JsonNode and
+ converts it to Row.
+
+ Failure during deserialization are forwarded as wrapped IOException.
+ """
+
+ def __init__(self, j_deserialization_schema):
+ super(CsvRowDeserializationSchema, self).__init__(
+ j_deserialization_schema=j_deserialization_schema)
+
+ class Builder(object):
+ """
+ A builder for creating a CsvRowDeserializationSchema.
+ """
+ def __init__(self, type_info: TypeInformation):
+ if type_info is None:
+ raise TypeError("Type information must not be None")
+ self._j_builder = get_gateway().jvm\
+ .org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder(
+ type_info.get_java_type_info())
+
+ def set_field_delimiter(self, delimiter: str):
+ self._j_builder = self._j_builder.setFieldDelimiter(delimiter)
+ return self
+
+ def set_allow_comments(self, allow_comments: bool):
+ self._j_builder = self._j_builder.setAllowComments(allow_comments)
+ return self
+
+ def set_array_element_delimiter(self, delimiter: str):
+ self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
+ return self
+
+ def set_quote_character(self, c: str):
+ self._j_builder = self._j_builder.setQuoteCharacter(c)
+ return self
+
+ def set_escape_character(self, c: str):
+ self._j_builder = self._j_builder.setEscapeCharacter(c)
+ return self
+
+ def set_null_literal(self, null_literal: str):
+ self._j_builder = self._j_builder.setNullLiteral(null_literal)
+ return self
+
+ def set_ignore_parse_errors(self, ignore_parse_errors: bool):
+ self._j_builder = self._j_builder.setIgnoreParseErrors(ignore_parse_errors)
+ return self
+
+ def build(self):
+ j_csv_row_deserialization_schema = self._j_builder.build()
+ return CsvRowDeserializationSchema(
+ j_deserialization_schema=j_csv_row_deserialization_schema)
+
+
+class CsvRowSerializationSchema(SerializationSchema):
+ """
+ Serialization schema that serializes an object of Flink types into a CSV bytes. Serializes the
+ input row into an ObjectNode and converts it into byte[].
+
+ Result byte[] messages can be deserialized using CsvRowDeserializationSchema.
+ """
+ def __init__(self, j_csv_row_serialization_schema):
+ super(CsvRowSerializationSchema, self).__init__(j_csv_row_serialization_schema)
+
+ class Builder(object):
+ """
+ A builder for creating a CsvRowSerializationSchema.
+ """
+ def __init__(self, type_info: TypeInformation):
+ if type_info is None:
+ raise TypeError("Type information must not be None")
+ self._j_builder = get_gateway().jvm\
+ .org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder(
+ type_info.get_java_type_info())
+
+ def set_field_delimiter(self, c: str):
+ self._j_builder = self._j_builder.setFieldDelimiter(c)
+ return self
+
+ def set_line_delimiter(self, delimiter: str):
+ self._j_builder = self._j_builder.setLineDelimiter(delimiter)
+ return self
+
+ def set_array_element_delimiter(self, delimiter: str):
+ self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
+ return self
+
+ def disable_quote_character(self):
+ self._j_builder = self._j_builder.disableQuoteCharacter()
+ return self
+
+ def set_quote_character(self, c: str):
+ self._j_builder = self._j_builder.setQuoteCharacter(c)
+ return self
+
+ def set_escape_character(self, c: str):
+ self._j_builder = self._j_builder.setEscapeCharacter(c)
+ return self
+
+ def set_null_literal(self, s: str):
+ self._j_builder = self._j_builder.setNullLiteral(s)
+ return self
+
+ def build(self):
+ j_serialization_schema = self._j_builder.build()
+ return CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema)
diff --git a/flink-python/pyflink/datastream/formats/json.py b/flink-python/pyflink/datastream/formats/json.py
new file mode 100644
index 00000000000..4cc216e28cd
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/json.py
@@ -0,0 +1,150 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common import SerializationSchema, TypeInformation, typeinfo, DeserializationSchema
+from pyflink.java_gateway import get_gateway
+
+
+__all__ = [
+ 'JsonRowDeserializationSchema',
+ 'JsonRowSerializationSchema'
+]
+
+
+class JsonRowDeserializationSchema(DeserializationSchema):
+ """
+ Deserialization schema from JSON to Flink types.
+
+ Deserializes a byte[] message as a JSON object and reads the specified fields.
+
+ Failures during deserialization are forwarded as wrapped IOExceptions.
+ """
+ def __init__(self, j_deserialization_schema):
+ super(JsonRowDeserializationSchema, self).__init__(j_deserialization_schema)
+
+ @staticmethod
+ def builder():
+ """
+ A static method to get a Builder for JsonRowDeserializationSchema.
+ """
+ return JsonRowDeserializationSchema.Builder()
+
+ class Builder(object):
+ """
+ Builder for JsonRowDeserializationSchema.
+ """
+
+ def __init__(self):
+ self._type_info = None
+ self._fail_on_missing_field = False
+ self._ignore_parse_errors = False
+
+ def type_info(self, type_info: TypeInformation):
+ """
+ Creates a JSON deserialization schema for the given type information.
+
+ :param type_info: Type information describing the result type. The field names of Row
+ are used to parse the JSON properties.
+ """
+ self._type_info = type_info
+ return self
+
+ def json_schema(self, json_schema: str):
+ """
+ Creates a JSON deserialization schema for the given JSON schema.
+
+ :param json_schema: JSON schema describing the result type.
+ """
+ if json_schema is None:
+ raise TypeError("The json_schema must not be None.")
+ j_type_info = get_gateway().jvm \
+ .org.apache.flink.formats.json.JsonRowSchemaConverter.convert(json_schema)
+ self._type_info = typeinfo._from_java_type(j_type_info)
+ return self
+
+ def fail_on_missing_field(self):
+ """
+ Configures schema to fail if a JSON field is missing. A missing field is ignored and the
+ field is set to null by default.
+ """
+ self._fail_on_missing_field = True
+ return self
+
+ def ignore_parse_errors(self):
+ """
+ Configures schema to fail when parsing json failed. An exception will be thrown when
+ parsing json fails.
+ """
+ self._ignore_parse_errors = True
+ return self
+
+ def build(self):
+ JBuilder = get_gateway().jvm.org.apache.flink.formats.json.JsonRowDeserializationSchema\
+ .Builder
+ j_builder = JBuilder(self._type_info.get_java_type_info())
+
+ if self._fail_on_missing_field:
+ j_builder = j_builder.fialOnMissingField()
+
+ if self._ignore_parse_errors:
+ j_builder = j_builder.ignoreParseErrors()
+
+ j_deserialization_schema = j_builder.build()
+ return JsonRowDeserializationSchema(j_deserialization_schema=j_deserialization_schema)
+
+
+class JsonRowSerializationSchema(SerializationSchema):
+ """
+ Serialization schema that serializes an object of Flink types into a JSON bytes. Serializes the
+ input Flink object into a JSON string and converts it into byte[].
+
+ Result byte[] message can be deserialized using JsonRowDeserializationSchema.
+ """
+
+ def __init__(self, j_serialization_schema):
+ super(JsonRowSerializationSchema, self).__init__(j_serialization_schema)
+
+ @staticmethod
+ def builder():
+ return JsonRowSerializationSchema.Builder()
+
+ class Builder(object):
+ """
+ Builder for JsonRowSerializationSchema.
+ """
+ def __init__(self):
+ self._type_info = None
+
+ def with_type_info(self, type_info: TypeInformation):
+ """
+ Creates a JSON serialization schema for the given type information.
+
+ :param type_info: Type information describing the result type. The field names of Row
+ are used to parse the JSON properties.
+ """
+ self._type_info = type_info
+ return self
+
+ def build(self):
+ if self._type_info is None:
+ raise TypeError("Typeinfo should be set.")
+
+ j_builder = get_gateway().jvm \
+ .org.apache.flink.formats.json.JsonRowSerializationSchema.builder()
+
+ j_schema = j_builder.withTypeInfo(self._type_info.get_java_type_info()).build()
+ return JsonRowSerializationSchema(j_serialization_schema=j_schema)
diff --git a/flink-python/pyflink/datastream/formats/orc.py b/flink-python/pyflink/datastream/formats/orc.py
index a34093d8fa7..459690dadbe 100644
--- a/flink-python/pyflink/datastream/formats/orc.py
+++ b/flink-python/pyflink/datastream/formats/orc.py
@@ -15,15 +15,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
from pyflink.common import Configuration
-from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory
+from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory
from pyflink.datastream.utils import create_hadoop_configuration, create_java_properties
from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_data_type, RowType
from pyflink.util.java_utils import to_jarray
+if TYPE_CHECKING:
+ from pyflink.table.types import RowType
+
__all__ = [
'OrcBulkWriters'
]
@@ -31,20 +33,20 @@ __all__ = [
class OrcBulkWriters(object):
"""
- Convenient builder to create a :class:`~connectors.file_system.BulkWriterFactory` that writes
- Row records with a defined RowType into Orc files in a batch fashion.
+ Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
+ writes records with a predefined schema into Orc files in a batch fashion.
.. versionadded:: 1.16.0
"""
@staticmethod
- def for_row_type(row_type: RowType,
+ def for_row_type(row_type: 'RowType',
writer_properties: Optional[Configuration] = None,
hadoop_config: Optional[Configuration] = None) \
-> BulkWriterFactory:
"""
- Create a RowDataBulkWriterFactory that writes Row records with a defined RowType into Orc
- files in a batch fashion.
+ Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that writes records
+ with a predefined schema into Orc files in a batch fashion.
Example:
::
@@ -69,10 +71,16 @@ class OrcBulkWriters(object):
Note that in the above example, an identity map to indicate its RowTypeInfo is necessary
before ``sink_to`` when ``ds`` is a source stream producing **RowData** records,
because RowDataBulkWriterFactory assumes the input record type is Row.
+
+ :param row_type: Row type of orc table.
+ :param writer_properties: Properties that can be used in ORC WriterOptions.
+ :param hadoop_config: Hadoop configurations used in ORC WriterOptions.
"""
+ from pyflink.table.types import RowType
if not isinstance(row_type, RowType):
raise TypeError('row_type must be an instance of RowType')
+ from pyflink.table.types import _to_java_data_type
j_data_type = _to_java_data_type(row_type)
jvm = get_gateway().jvm
j_row_type = j_data_type.getLogicalType()
diff --git a/flink-python/pyflink/datastream/formats/parquet.py b/flink-python/pyflink/datastream/formats/parquet.py
index 5d1164b82b8..83aef589f9f 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -15,22 +15,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
from pyflink.common import Configuration
-from pyflink.datastream.connectors.file_system import StreamFormat, BulkFormat, BulkWriterFactory, \
- RowDataBulkWriterFactory
+from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory
+from pyflink.datastream.connectors.file_system import StreamFormat, BulkFormat
from pyflink.datastream.formats.avro import AvroSchema
from pyflink.datastream.utils import create_hadoop_configuration
from pyflink.java_gateway import get_gateway
-from pyflink.table.types import RowType, _to_java_data_type
+if TYPE_CHECKING:
+ from pyflink.table.types import RowType
__all__ = [
'AvroParquetReaders',
'AvroParquetWriters',
- 'ParquetBulkWriter',
'ParquetColumnarRowInputFormat',
+ 'ParquetBulkWriters'
]
@@ -142,7 +143,7 @@ class ParquetColumnarRowInputFormat(BulkFormat):
"""
def __init__(self,
- row_type: RowType,
+ row_type: 'RowType',
hadoop_config: Optional[Configuration] = None,
batch_size: int = 2048,
is_utc_timestamp: bool = False,
@@ -150,6 +151,7 @@ class ParquetColumnarRowInputFormat(BulkFormat):
if not hadoop_config:
hadoop_config = Configuration()
+ from pyflink.table.types import _to_java_data_type
jvm = get_gateway().jvm
j_row_type = _to_java_data_type(row_type).getLogicalType()
produced_type_info = jvm.org.apache.flink.table.runtime.typeutils. \
@@ -161,20 +163,21 @@ class ParquetColumnarRowInputFormat(BulkFormat):
super().__init__(j_parquet_columnar_format)
-class ParquetBulkWriter(object):
+class ParquetBulkWriters(object):
"""
- Convenient builder to create a :class:`BulkWriterFactory` that writes Rows with a defined
- RowType into Parquet files in a batch fashion.
+ Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
+ writes records with a predefined schema into Parquet files in a batch fashion.
.. versionadded:: 1.16.0
"""
@staticmethod
- def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration] = None,
+ def for_row_type(row_type: 'RowType',
+ hadoop_config: Optional[Configuration] = None,
utc_timestamp: bool = False) -> 'BulkWriterFactory':
"""
- Create a RowDataBulkWriterFactory that writes Rows records with a defined RowType into
- Parquet files in a batch fashion.
+ Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that writes records
+ with a predefined schema into Parquet files in a batch fashion.
Example:
::
@@ -188,7 +191,7 @@ class ParquetBulkWriter(object):
... [Types.STRING(), Types.LIST(Types.INT())]
... )
>>> sink = FileSink.for_bulk_format(
- ... OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+ ... OUTPUT_DIR, ParquetBulkWriters.for_row_type(
... row_type,
... hadoop_config=Configuration(),
... utc_timestamp=True,
@@ -198,11 +201,17 @@ class ParquetBulkWriter(object):
Note that in the above example, an identity map to indicate its RowTypeInfo is necessary
before ``sink_to`` when ``ds`` is a source stream producing **RowData** records, because
- RowDataBulkWriterFactory assumes the input record type is **Row** .
+ RowDataBulkWriterFactory assumes the input record type is **Row**.
+
+ :param row_type: Row type of parquet table.
+ :param hadoop_config: Haodop configurations.
+ :param utc_timestamp: Whether to use UTC timezone or local timezone to the conversion
+ between epoch time and LocalDateTime.
"""
if not hadoop_config:
hadoop_config = Configuration()
+ from pyflink.table.types import _to_java_data_type
jvm = get_gateway().jvm
JParquetRowDataBuilder = jvm.org.apache.flink.formats.parquet.row.ParquetRowDataBuilder
return RowDataBulkWriterFactory(JParquetRowDataBuilder.createWriterFactory(
diff --git a/flink-python/pyflink/datastream/formats/tests/test_avro.py b/flink-python/pyflink/datastream/formats/tests/test_avro.py
index 01e7c1ea66a..bbc1d4c932b 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_avro.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_avro.py
@@ -26,8 +26,8 @@ from py4j.java_gateway import JavaObject, java_import
from pyflink.datastream import MapFunction
from pyflink.datastream.connectors.file_system import FileSink
-from pyflink.datastream.formats.avro import AvroSchema, GenericRecordAvroTypeInfo, AvroWriters, \
- AvroInputFormat
+from pyflink.datastream.formats.avro import AvroSchema, GenericRecordAvroTypeInfo, \
+ AvroBulkWriters, AvroInputFormat
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
@@ -144,7 +144,7 @@ class FileSinkAvroWritersTests(PyFlinkStreamingTestCase):
def _build_avro_job(self, schema, objects):
ds = self.env.from_collection(objects)
sink = FileSink.for_bulk_format(
- self.avro_dir_name, AvroWriters.for_generic_record(schema)
+ self.avro_dir_name, AvroBulkWriters.for_generic_record(schema)
).build()
ds.map(lambda e: e, output_type=GenericRecordAvroTypeInfo(schema)).sink_to(sink)
diff --git a/flink-python/pyflink/datastream/formats/tests/test_csv.py b/flink-python/pyflink/datastream/formats/tests/test_csv.py
index e4b14a9ec35..c2756923446 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_csv.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_csv.py
@@ -23,10 +23,12 @@ from typing import Tuple, List
from pyflink.common import WatermarkStrategy, Types
from pyflink.datastream import MapFunction
from pyflink.datastream.connectors.file_system import FileSource, FileSink
-from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat, CsvBulkWriter
+from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat, CsvBulkWriters, \
+ CsvRowSerializationSchema, CsvRowDeserializationSchema
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
+from pyflink.java_gateway import get_gateway
from pyflink.table import DataTypes
-from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, PyFlinkTestCase
class FileSourceCsvReaderFormatTests(PyFlinkStreamingTestCase):
@@ -163,7 +165,7 @@ class FileSinkCsvBulkWriterTests(PyFlinkStreamingTestCase):
).build()
ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
sink = FileSink.for_bulk_format(
- self.csv_dir_name, CsvBulkWriter.for_schema(schema)
+ self.csv_dir_name, CsvBulkWriters.for_schema(schema)
).build()
ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
@@ -175,6 +177,54 @@ class FileSinkCsvBulkWriterTests(PyFlinkStreamingTestCase):
return lines
+class JsonSerializationSchemaTests(PyFlinkTestCase):
+
+ def test_csv_row_serialization_schema(self):
+ jvm = get_gateway().jvm
+ JRow = jvm.org.apache.flink.types.Row
+
+ j_row = JRow(3)
+ j_row.setField(0, "BEGIN")
+ j_row.setField(2, "END")
+
+ def field_assertion(field_info, csv_value, value, field_delimiter):
+ row_info = Types.ROW([Types.STRING(), field_info, Types.STRING()])
+ expected_csv = "BEGIN" + field_delimiter + csv_value + field_delimiter + "END\n"
+ j_row.setField(1, value)
+
+ csv_row_serialization_schema = CsvRowSerializationSchema.Builder(row_info)\
+ .set_escape_character('*').set_quote_character('\'')\
+ .set_array_element_delimiter(':').set_field_delimiter(';').build()
+ csv_row_deserialization_schema = CsvRowDeserializationSchema.Builder(row_info)\
+ .set_escape_character('*').set_quote_character('\'')\
+ .set_array_element_delimiter(':').set_field_delimiter(';').build()
+ csv_row_serialization_schema._j_serialization_schema.open(
+ jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
+ csv_row_deserialization_schema._j_deserialization_schema.open(
+ jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
+
+ serialized_bytes = csv_row_serialization_schema._j_serialization_schema.serialize(j_row)
+ self.assertEqual(expected_csv, str(serialized_bytes, encoding='utf-8'))
+
+ j_deserialized_row = csv_row_deserialization_schema._j_deserialization_schema\
+ .deserialize(expected_csv.encode("utf-8"))
+ self.assertTrue(j_row.equals(j_deserialized_row))
+
+ field_assertion(Types.STRING(), "'123''4**'", "123'4*", ";")
+ field_assertion(Types.STRING(), "'a;b''c'", "a;b'c", ";")
+ field_assertion(Types.INT(), "12", 12, ";")
+
+ test_j_row = JRow(2)
+ test_j_row.setField(0, "1")
+ test_j_row.setField(1, "hello")
+
+ field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello'", test_j_row, ";")
+ test_j_row.setField(1, "hello world")
+ field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello world'", test_j_row,
+ ";")
+ field_assertion(Types.STRING(), "null", "null", ";")
+
+
class PassThroughMapFunction(MapFunction):
def map(self, value):
diff --git a/flink-python/pyflink/datastream/formats/tests/test_json.py b/flink-python/pyflink/datastream/formats/tests/test_json.py
new file mode 100644
index 00000000000..e8fc02bda05
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/tests/test_json.py
@@ -0,0 +1,57 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common import Types
+from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
+from pyflink.java_gateway import get_gateway
+from pyflink.testing.test_case_utils import PyFlinkTestCase
+
+
+class JsonSerializationSchemaTests(PyFlinkTestCase):
+
+ def test_json_row_serialization_deserialization_schema(self):
+ jvm = get_gateway().jvm
+ jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\"}",
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\", "
+ "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},\"ids\":[1, 2, 3]}",
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\"}"]
+ expected_jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}",
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\","
+ "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},"
+ "\"ids\":[1,2,3]}",
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}"]
+
+ row_schema = Types.ROW_NAMED(["svt", "ops", "ids"],
+ [Types.STRING(),
+ Types.ROW_NAMED(['id'], [Types.STRING()]),
+ Types.PRIMITIVE_ARRAY(Types.INT())])
+
+ json_row_serialization_schema = JsonRowSerializationSchema.builder() \
+ .with_type_info(row_schema).build()
+ json_row_deserialization_schema = JsonRowDeserializationSchema.builder() \
+ .type_info(row_schema).build()
+ json_row_serialization_schema._j_serialization_schema.open(
+ jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
+ json_row_deserialization_schema._j_deserialization_schema.open(
+ jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
+
+ for i in range(len(jsons)):
+ j_row = json_row_deserialization_schema._j_deserialization_schema\
+ .deserialize(bytes(jsons[i], encoding='utf-8'))
+ result = str(json_row_serialization_schema._j_serialization_schema.serialize(j_row),
+ encoding='utf-8')
+ self.assertEqual(expected_jsons[i], result)
diff --git a/flink-python/pyflink/datastream/formats/tests/test_parquet.py b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
index 16bf47edbfe..d7ee2cdd30e 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
@@ -44,7 +44,7 @@ from pyflink.datastream.formats.tests.test_avro import \
_create_basic_avro_schema_and_records, _import_avro_classes
from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, AvroSchema
from pyflink.datastream.formats.parquet import AvroParquetReaders, ParquetColumnarRowInputFormat, \
- AvroParquetWriters, ParquetBulkWriter
+ AvroParquetWriters, ParquetBulkWriters
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.datastream.utils import create_hadoop_configuration
from pyflink.java_gateway import get_gateway
@@ -270,7 +270,7 @@ class FileSinkParquetBulkWriterTests(PyFlinkStreamingTestCase):
conversion_type_info: Optional[RowTypeInfo] = None,
):
sink = FileSink.for_bulk_format(
- self.parquet_dir_name, ParquetBulkWriter.for_row_type(row_type, utc_timestamp=True)
+ self.parquet_dir_name, ParquetBulkWriters.for_row_type(row_type, utc_timestamp=True)
).build()
ds = self.env.from_collection(data, type_info=row_type_info)
if conversion_type_info:
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 97bce229b04..31cfaf38bd1 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -18,12 +18,13 @@
import os
import tempfile
-from typing import List, Any, Optional, cast, TYPE_CHECKING
+from typing import List, Any, Optional, cast
from py4j.java_gateway import JavaObject
from pyflink.common import Configuration, WatermarkStrategy
from pyflink.common.execution_config import ExecutionConfig
+from pyflink.common.io import InputFormat
from pyflink.common.job_client import JobClient
from pyflink.common.job_execution_result import JobExecutionResult
from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
@@ -43,8 +44,6 @@ from pyflink.serializers import PickleSerializer
from pyflink.util.java_utils import load_java_class, add_jars_to_context_class_loader, \
invoke_method, get_field_value, is_local_deployment, get_j_env_configuration
-if TYPE_CHECKING:
- from pyflink.datastream.connectors.file_system import InputFormat
__all__ = ['StreamExecutionEnvironment']
@@ -837,7 +836,7 @@ class StreamExecutionEnvironment(object):
return StreamExecutionEnvironment(j_stream_exection_environment)
- def create_input(self, input_format: 'InputFormat',
+ def create_input(self, input_format: InputFormat,
type_info: Optional[TypeInformation] = None):
"""
Create an input data stream with InputFormat.
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index c80ab5f7d9b..1d3a94d3d3a 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -27,13 +27,13 @@ import unittest
import uuid
from pyflink.common import Configuration, ExecutionConfig, RestartStrategies
-from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import (StreamExecutionEnvironment, CheckpointConfig,
CheckpointingMode, MemoryStateBackend, TimeCharacteristic,
SlotSharingGroup)
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.execution_mode import RuntimeExecutionMode
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.functions import SourceFunction
from pyflink.datastream.slot_sharing_group import MemorySize
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
diff --git a/flink-python/pyflink/datastream/utils.py b/flink-python/pyflink/datastream/utils.py
index efbdd1b5da1..caad21f88f0 100644
--- a/flink-python/pyflink/datastream/utils.py
+++ b/flink-python/pyflink/datastream/utils.py
@@ -34,15 +34,6 @@ class ResultTypeQueryable(object):
pass
-class JavaObjectWrapper(object):
-
- def __init__(self, j_object):
- self._j_object = j_object
-
- def get_java_object(self):
- return self._j_object
-
-
def create_hadoop_configuration(config: Configuration):
jvm = get_gateway().jvm
hadoop_config = jvm.org.apache.hadoop.conf.Configuration()
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
index dd3711aff90..2a66bc4e8b1 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
@@ -18,9 +18,10 @@
import logging
import sys
-from pyflink.common import AvroRowSerializationSchema, Types, AvroRowDeserializationSchema
+from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.avro import AvroRowSerializationSchema, AvroRowDeserializationSchema
# Make sure that the Kafka cluster is started and the topic 'test_avro_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
index 99d5d62e90c..4dbb243fcf9 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
@@ -18,9 +18,11 @@
import logging
import sys
-from pyflink.common import Types, JsonRowDeserializationSchema, CsvRowSerializationSchema
+from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.csv import CsvRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema
# Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
index cf650fdb731..3cae241ba43 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
@@ -18,9 +18,10 @@
import logging
import sys
-from pyflink.common import Types, JsonRowDeserializationSchema, JsonRowSerializationSchema
+from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is