You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/20 07:38:41 UTC

[GitHub] [flink] fapaul commented on a diff in pull request #19473: [FLINK-27199][Connector/Pulsar] Bump pulsar to 2.10.0

fapaul commented on code in PR #19473:
URL: https://github.com/apache/flink/pull/19473#discussion_r853812193


##########
docs/layouts/shortcodes/generated/pulsar_producer_configuration.html:
##########
@@ -56,18 +56,6 @@
             <td>Long</td>
             <td>The sequence id for avoiding the duplication, it's used when Pulsar doesn't have transaction.</td>
         </tr>
-        <tr>
-            <td><h5>pulsar.producer.maxPendingMessages</h5></td>

Review Comment:
   Please also use a separate commit for the restructuring of the docs if they are unrelated to the version bump.



##########
docs/layouts/shortcodes/generated/pulsar_client_configuration.html:
##########
@@ -100,7 +100,7 @@
         </tr>
         <tr>
             <td><h5>pulsar.client.memoryLimitBytes</h5></td>
-            <td style="word-wrap: break-word;">0</td>

Review Comment:
   I think this change deserves at least a separate commit. Is this changing the behavior or correcting the docs?



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java:
##########
@@ -128,6 +127,16 @@ private PulsarSinkOptions() {
                             "The allowed transaction recommit times if we meet some retryable exception."
                                     + " This is used in Pulsar Transaction.");
 
+    public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "maxPendingMessages")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The maximum number of pending messages in on sink parallelism.")

Review Comment:
   Can you explain this configuration value? The current description is hard to understand.



##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java:
##########
@@ -50,22 +49,10 @@ static PulsarRuntime mock() {
         return new PulsarMockRuntime();
     }
 
-    /**
-     * Create a standalone Pulsar instance in test thread. We would start an embedded zookeeper and
-     * bookkeeper. The stream storage for bookkeeper is disabled. The function worker is disabled on
-     * Pulsar broker.
-     *
-     * <p>This runtime would be faster than {@link #container()} and behaves the same as the {@link
-     * #container()}.
-     */
-    static PulsarRuntime embedded() {

Review Comment:
   Has this something to do with the version bump? If not, please use a separate commit to remove the embedded test environment.



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java:
##########
@@ -128,6 +127,16 @@ private PulsarSinkOptions() {
                             "The allowed transaction recommit times if we meet some retryable exception."
                                     + " This is used in Pulsar Transaction.");
 
+    public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM =

Review Comment:
   Can you add this configuration on a separate commit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org