You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 09:56:14 UTC

[GitHub] [flink] Vancior opened a new pull request, #20263: [FLINK-28510][python][connector] Support using new KafkaSink API

Vancior opened a new pull request, #20263:
URL: https://github.com/apache/flink/pull/20263

   
   
   ## What is the purpose of the change
   
   This PR introduces KafkaSink API in PyFlink, aligning the Java new KafkaSink API.
   
   
   ## Brief change log
   
   - add KafkaSink, KafkaSinkBuilder, KafkaRecordSerializationSchema, KafkaRecordSerializationSchemaBuilder API
   - add optional transformation for Sink, to support selecting the sink topic for each record as additional row info in Python and then do selection in Java KafkaSink
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
   - `KafkaSinkTests` in test_kafka.py
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs & Python Sphinx doc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] bgeng777 commented on a diff in pull request #20263: [FLINK-28510][python][connector] Support using new KafkaSink API

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on code in PR #20263:
URL: https://github.com/apache/flink/pull/20263#discussion_r930839985


##########
flink-python/pyflink/datastream/connectors/kafka.py:
##########
@@ -816,3 +826,340 @@ def offsets(offsets: Dict['KafkaTopicPartition', int],
             enumerator.initializer.OffsetsInitializer
         return KafkaOffsetsInitializer(JOffsetsInitializer.offsets(
             j_map_wrapper.asMap(), offset_reset_strategy._to_j_offset_reset_strategy()))
+
+
+class KafkaSink(Sink, SupportPreprocessing):
+    """
+    Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees
+    described by :class:`DeliveryGuarantee`.
+
+    * :attr:`DeliveryGuarantee.NONE` does not provide any guarantees: messages may be lost in case
+      of issues on the Kafka broker and messages may be duplicated in case of a Flink failure.
+    * :attr:`DeliveryGuarantee.AT_LEAST_ONCE` the sink will wait for all outstanding records in the
+      Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No messages will be
+      lost in case of any issue with the Kafka brokers but messages may be duplicated when Flink
+      restarts.
+    * :attr:`DeliveryGuarantee.EXACTLY_ONCE`: In this mode the KafkaSink will write all messages in
+      a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer
+      reads only committed data (see Kafka consumer config ``isolation.level``), no duplicates
+      will be seen in case of a Flink restart. However, this delays record writing effectively
+      until a checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure
+      that you use unique transactional id prefixes across your applications running on the same
+      Kafka cluster such that multiple running jobs do not interfere in their transactions!
+      Additionally, it is highly recommended to tweak Kafka transaction timeout (link) >> maximum
+      checkpoint duration + maximum restart duration or data loss may happen when Kafka expires an
+      uncommitted transaction.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_kafka_sink, preprocessing: TransformAppender = None):
+        super().__init__(j_kafka_sink)
+        self._preprocessing = preprocessing
+
+    @staticmethod
+    def builder() -> 'KafkaSinkBuilder':
+        """
+        Create a :class:`KafkaSinkBuilder` to construct :class:`KafkaSink`.
+        """
+        return KafkaSinkBuilder()
+
+    def need_preprocessing(self) -> bool:
+        return self._preprocessing is not None
+
+    def get_preprocessing(self) -> TransformAppender:
+        return self._preprocessing
+
+
+class KafkaSinkBuilder(object):
+    """
+    Builder to construct :class:`KafkaSink`.
+
+    The following example shows the minimum setup to create a KafkaSink that writes String values
+    to a Kafka topic.
+
+    ::
+
+        >>> record_serializer = KafkaRecordSerializationSchema.builder() \\
+        ...     .set_topic(MY_SINK_TOPIC) \\
+        ...     .set_value_serialization_schema(SimpleStringSchema()) \\
+        ...     .build()
+        >>> sink = KafkaSink.builder() \\
+        ...     .set_bootstrap_servers(MY_BOOTSTRAP_SERVERS) \\
+        ...     .set_record_serializer(record_serializer) \\
+        ...     .build()
+
+    One can also configure different :class:`DeliveryGuarantee` by using
+    :meth:`set_delivery_guarantee` but keep in mind when using
+    :attr:`DeliveryGuarantee.EXACTLY_ONCE`, one must set the transactional id prefix
+    :meth:`set_transactional_id_prefix`.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self):
+        jvm = get_gateway().jvm
+        self._j_builder = jvm.org.apache.flink.connector.kafka.sink.KafkaSink.builder()
+        self._preprocessing = None
+
+    def build(self) -> 'KafkaSink':
+        """
+        Constructs the :class:`KafkaSink` with the configured properties.
+        """
+        if self._preprocessing is None:
+            return KafkaSink(self._j_builder.build())
+        else:
+            return KafkaSink(self._j_builder.build(), self._preprocessing)

Review Comment:
   directly `return KafkaSink(self._j_builder.build(), self._preprocessing)` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo closed pull request #20263: [FLINK-28510][python][connector] Support using new KafkaSink API

Posted by GitBox <gi...@apache.org>.
HuangXingBo closed pull request #20263: [FLINK-28510][python][connector] Support using new KafkaSink API
URL: https://github.com/apache/flink/pull/20263


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20263: [FLINK-28510][python][connector] Support using new KafkaSink API

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20263:
URL: https://github.com/apache/flink/pull/20263#discussion_r930546050


##########
flink-python/pyflink/datastream/connectors/base.py:
##########
@@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]):
         super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class PreTransformWrapper(ABC):
+
+    @abstractmethod
+    def need_pre_transform(self) -> bool:

Review Comment:
   This is still needed that a sink implementing `SupportsPreprocessing` may not require preprocessing, e.g. `KafkaSink` doesn't need preprocessing when using fixed topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20263: [FLINK-28510][python][connector] Support using new KafkaSink API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20263:
URL: https://github.com/apache/flink/pull/20263#issuecomment-1183022198

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e724adefe34440e7d9d03fda53bd0d55463c06ec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e724adefe34440e7d9d03fda53bd0d55463c06ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e724adefe34440e7d9d03fda53bd0d55463c06ec UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20263: [FLINK-28510][python][connector] Support using new KafkaSink API

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20263:
URL: https://github.com/apache/flink/pull/20263#discussion_r923115751


##########
docs/layouts/shortcodes/py_download_link.html:
##########
@@ -19,12 +19,68 @@
 Generates the PyFlink download connector page.
 */}}
 {{ $name := .Get 0 }}
-{{ $text := .Get 1 }}
 {{ $connectors := .Site.Data.sql_connectors }}
 {{ $connector := index $connectors $name }}
 
-<p>In order to use the {{ $connector.name }} {{ $connector.category }} in PyFlink jobs, the following
-dependencies are required: <a href="{{- partial "docs/interpolate" $connector.sql_url -}}">{{ $text }}</a>.
+<p>
+{{ if eq $.Site.Language.Lang "en" }}
+In order to use the {{ $connector.name }} {{ $connector.category }} in PyFlink jobs, the following
+dependencies are required:
+{{ else if eq $.Site.Language.Lang "zh" }}
+为了在 PyFlink 作业中使用 {{ $connector.name }} {{ $connector.category }} ,需要添加下列依赖:
+{{ end }}
+{{ if eq $connector.versions nil }}
+<table>
+    <thead>
+    <th style="text-align:left">SQL Client JAR</th>

Review Comment:
   ```suggestion
       <th style="text-align:left">PyFlink JAR</th>
   ```



##########
flink-python/pyflink/common/types.py:
##########
@@ -277,3 +290,33 @@ def __iter__(self):
 
     def __len__(self):
         return len(self._values)
+
+    def to_java_row(self):

Review Comment:
   What about convert it to a utility method and move it into java_utils?



##########
docs/layouts/shortcodes/py_download_link.html:
##########
@@ -19,12 +19,68 @@
 Generates the PyFlink download connector page.
 */}}
 {{ $name := .Get 0 }}
-{{ $text := .Get 1 }}
 {{ $connectors := .Site.Data.sql_connectors }}
 {{ $connector := index $connectors $name }}
 
-<p>In order to use the {{ $connector.name }} {{ $connector.category }} in PyFlink jobs, the following
-dependencies are required: <a href="{{- partial "docs/interpolate" $connector.sql_url -}}">{{ $text }}</a>.
+<p>
+{{ if eq $.Site.Language.Lang "en" }}
+In order to use the {{ $connector.name }} {{ $connector.category }} in PyFlink jobs, the following
+dependencies are required:
+{{ else if eq $.Site.Language.Lang "zh" }}
+为了在 PyFlink 作业中使用 {{ $connector.name }} {{ $connector.category }} ,需要添加下列依赖:
+{{ end }}
+{{ if eq $connector.versions nil }}
+<table>
+    <thead>
+    <th style="text-align:left">SQL Client JAR</th>
+    </thead>
+    <tbody>
+    <tr>
+        {{ if eq $connector.builtin true }}
+        <td style="text-align: left">Built-in</td>
+        {{ else if $.Site.Params.IsStable }}
+        {{ if eq $connector.sql_url nil }}
+        <td style="text-align:left">There is not sql jar available yet.</td>

Review Comment:
   ```suggestion
           <td style="text-align:left">There is no sql jar available yet.</td>
   ```



##########
flink-python/pyflink/common/types.py:
##########
@@ -39,6 +41,17 @@ def __str__(self):
         else:
             return '-D'
 
+    def to_j_row_kind(self):
+        JRowKind = get_gateway().jvm.org.apache.flink.types.RowKind
+        if self.value == RowKind.INSERT.value:
+            return JRowKind.INSERT
+        elif self.value == RowKind.UPDATE_BEFORE.value:
+            return JRowKind.UPDATE_BEFORE
+        elif self.value == RowKind.UPDATE_AFTER.value:
+            return JRowKind.UPDATE_AFTER
+        else:
+            return JRowKind.DELETE

Review Comment:
   Could be simplified as following:
   ```
   JRowKind = get_gateway().jvm.org.apache.flink.types.RowKind
   return getattr(JRowKind, self.name)
   ```



##########
flink-python/pyflink/common/serialization.py:
##########
@@ -38,6 +40,10 @@ class SerializationSchema(object):
     def __init__(self, j_serialization_schema=None):
         self._j_serialization_schema = j_serialization_schema
 
+    @abstractmethod
+    def require_row_type(self) -> bool:

Review Comment:
   Seems a little wired, what about removing this method?



##########
docs/content.zh/docs/connectors/datastream/formats/csv.md:
##########
@@ -38,6 +38,8 @@ To use the CSV format you need to add the Flink CSV dependency to your project:
 </dependency>
 ```
 
+{{< py_download_link "csv" >}}

Review Comment:
   csv is built-in supported and so I think this is unnecessary.



##########
flink-python/pyflink/datastream/connectors/base.py:
##########
@@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]):
         super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class PreTransformWrapper(ABC):
+
+    @abstractmethod
+    def need_pre_transform(self) -> bool:
+        pass
+
+    @abstractmethod
+    def get_pre_transform(self) -> 'TransformAppender':

Review Comment:
   apply(self, ds) -> 'ds'?



##########
flink-python/pyflink/datastream/connectors/base.py:
##########
@@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]):
         super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class PreTransformWrapper(ABC):

Review Comment:
   Rename it to SupportsPreprocessing?



##########
flink-python/pyflink/datastream/__init__.py:
##########
@@ -166,6 +166,10 @@
       A streaming data source that pulls a parallel data stream from Apache Kafka.
     - :class:`connectors.FlinkKafkaProducer`:
       A streaming data sink to produce data into a Kafka topic.
+    - :class:`connectors.KafkaSource`:
+      The new API to read data in parallel from Apache Kafka.
+    - :class:`connectors.FlinkKafkaProducer`:

Review Comment:
   Duplicate with line 167?



##########
flink-python/pyflink/datastream/connectors/base.py:
##########
@@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]):
         super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class PreTransformWrapper(ABC):
+
+    @abstractmethod
+    def need_pre_transform(self) -> bool:

Review Comment:
   remove this method?



##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.function.Function;
+
+/** Utility class for using DataStream connectors in Python. */
+public class PythonConnectorUtils {
+
+    /**
+     * Creates a selector that returns the first column of a row, and cast it to {@code clazz}.
+     * {@code T} should be a sub interface of {@link Function}, which accepts a {@link Row}.
+     *
+     * @param clazz The desired selector class to cast to, e.g. TopicSelector.class for Kafka.
+     * @param <T> An interface
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T createFirstColumnSelector(Class<T> clazz) {
+        return (T)
+                Proxy.newProxyInstance(
+                        clazz.getClassLoader(),
+                        new Class[] {clazz},
+                        new FirstColumnSelectorInvocationHandler());
+    }
+
+    /** The serializable {@link InvocationHandler} as the proxy for first column selector. */
+    public static class FirstColumnSelectorInvocationHandler
+            implements InvocationHandler, Serializable {
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+            Preconditions.checkArgument(method.getName().equals("apply"));
+            Preconditions.checkArgument(args.length == 1);
+            Preconditions.checkArgument(args[0] instanceof Row);
+            Row row = (Row) args[0];
+            Preconditions.checkArgument(row.getArity() >= 1);
+            return row.getField(0);
+        }
+    }
+
+    /**
+     * A {@link SerializationSchema} for {@link Row} that only serialize the second column using a
+     * wrapped {@link SerializationSchema} for {@link T}.
+     *
+     * @param <T> The actual data type wrapped in the Row.
+     */
+    public static class SecondColumnSerializationSchema<T> implements SerializationSchema<Row> {
+

Review Comment:
   Add `private static final long serialVersionUID = 1L`;



##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.function.Function;
+
+/** Utility class for using DataStream connectors in Python. */
+public class PythonConnectorUtils {
+
+    /**
+     * Creates a selector that returns the first column of a row, and cast it to {@code clazz}.
+     * {@code T} should be a sub interface of {@link Function}, which accepts a {@link Row}.
+     *
+     * @param clazz The desired selector class to cast to, e.g. TopicSelector.class for Kafka.
+     * @param <T> An interface
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T createFirstColumnSelector(Class<T> clazz) {

Review Comment:
   ```suggestion
       public static <T> T createFirstColumnTopicSelector(Class<T> clazz) {
   ```



##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.function.Function;
+
+/** Utility class for using DataStream connectors in Python. */
+public class PythonConnectorUtils {
+
+    /**
+     * Creates a selector that returns the first column of a row, and cast it to {@code clazz}.
+     * {@code T} should be a sub interface of {@link Function}, which accepts a {@link Row}.
+     *
+     * @param clazz The desired selector class to cast to, e.g. TopicSelector.class for Kafka.
+     * @param <T> An interface
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T createFirstColumnSelector(Class<T> clazz) {
+        return (T)
+                Proxy.newProxyInstance(
+                        clazz.getClassLoader(),
+                        new Class[] {clazz},
+                        new FirstColumnSelectorInvocationHandler());
+    }
+
+    /** The serializable {@link InvocationHandler} as the proxy for first column selector. */
+    public static class FirstColumnSelectorInvocationHandler

Review Comment:
   ```suggestion
       public static class FirstColumnTopicSelectorInvocationHandler
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org