You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/14 14:15:33 UTC

[GitHub] [flink] syhily opened a new pull request, #21074: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink

syhily opened a new pull request, #21074:
URL: https://github.com/apache/flink/pull/21074

   ## What is the purpose of the change
   
   PulsarSink is slow when we use the `At-Least-Once` or `Exactly-Once`. This is because PulsarSink uses `MallboxExecutor` to write messages. A lot of useless context switches and locks acquire happened when using this writing policy. We should drop `MallboxExecutor` and send messages directly for better performance.
   
   ## Brief change log
   
     - Deprecated useless `PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM` option since Pulsar client has its own send rate limit by memory.
     - Remove the use of `MallboxExecutor` in `PulsarWriter`.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *PulsarWriterTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21074: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21074:
URL: https://github.com/apache/flink/pull/21074#discussion_r998038872


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java:
##########
@@ -151,42 +146,23 @@ public void write(IN element, Context context) throws IOException, InterruptedEx
             // We would just ignore the sending exception. This may cause data loss.
             builder.sendAsync();
         } else {
-            // Waiting for permits to write message.
-            requirePermits();
-            mailboxExecutor.execute(
-                    () -> enqueueMessageSending(topic, builder),
-                    "Failed to send message to Pulsar");
-        }
-    }
-
-    private void enqueueMessageSending(String topic, TypedMessageBuilder<?> builder)
-            throws ExecutionException, InterruptedException {
-        // Block the mailbox executor for yield method.
-        builder.sendAsync()
-                .whenComplete(
-                        (id, ex) -> {
-                            this.releasePermits();
-                            if (ex != null) {
-                                throw new FlinkRuntimeException(
-                                        "Failed to send data to Pulsar " + topic, ex);
-                            } else {
-                                LOG.debug(
-                                        "Sent message to Pulsar {} with message id {}", topic, id);
-                            }
-                        })
-                .get();
-    }
-
-    private void requirePermits() throws InterruptedException {
-        while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) {
-            LOG.info("Waiting for the available permits.");
-            mailboxExecutor.yield();
+            // Increase the pending message count.
+            pendingMessages.incrementAndGet();
+            builder.sendAsync()
+                    .whenComplete(
+                            (id, ex) -> {
+                                pendingMessages.decrementAndGet();
+                                if (ex != null) {
+                                    throw new FlinkRuntimeException(
+                                            "Failed to send data to Pulsar " + topic, ex);

Review Comment:
   We may handle exceptions in the mailbox executor as `KafkaWriter` does. Otherwise, this exception isn't handled by any thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21074: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21074:
URL: https://github.com/apache/flink/pull/21074#issuecomment-1279082363

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6004175f2aa10d2d805a3a575e5efa0954482d1d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6004175f2aa10d2d805a3a575e5efa0954482d1d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6004175f2aa10d2d805a3a575e5efa0954482d1d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun merged pull request #21074: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink

Posted by GitBox <gi...@apache.org>.
tisonkun merged PR #21074:
URL: https://github.com/apache/flink/pull/21074


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org