You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/22 06:44:50 UTC

[GitHub] [pulsar] jerrypeng opened a new pull request #9275: Enable Function Workers to use exclusive producer to write to internal topics

jerrypeng opened a new pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275


   
   ### Motivation
   
   To maintain correctness, we need to use exclusive producer to write to the internal topics Pulsar Functions to manage metadata / state.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#discussion_r563441114



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -343,4 +349,47 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String topic,
+                                                                    String producerName) {
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating exclusive producer for topic %s", topic))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = client.newProducer().topic(topic)
+                                .accessMode(ProducerAccessMode.Exclusive)

Review comment:
       I don't think it is a good idea for "liveness" is we block forever which may cause deadlocks.  While blocking, the worker might be not even be the leader anymore.  I think we should keep retrying and periodically check if we are still the leader.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#discussion_r563795240



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -343,4 +349,47 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String topic,
+                                                                    String producerName) {
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating exclusive producer for topic %s", topic))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = client.newProducer().topic(topic)
+                                .accessMode(ProducerAccessMode.Exclusive)

Review comment:
       I agree with @jerrypeng 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#issuecomment-766882311


   What happens if the broker is still on an old version that does not support ExclusiveProducer mode ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#discussion_r563795240



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -343,4 +349,47 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String topic,
+                                                                    String producerName) {
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating exclusive producer for topic %s", topic))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = client.newProducer().topic(topic)
+                                .accessMode(ProducerAccessMode.Exclusive)

Review comment:
       I agree with @jerrypeng 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#discussion_r563441114



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -343,4 +349,47 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String topic,
+                                                                    String producerName) {
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating exclusive producer for topic %s", topic))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = client.newProducer().topic(topic)
+                                .accessMode(ProducerAccessMode.Exclusive)

Review comment:
       I don't think it is a good idea for "liveness" is we block forever which may cause deadlocks.  While blocking, the worker might be not even be the leader anymore.  I think we should keep retrying and periodically check if we are still the leader.

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -343,4 +349,47 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String topic,
+                                                                    String producerName) {
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating exclusive producer for topic %s", topic))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = client.newProducer().topic(topic)
+                                .accessMode(ProducerAccessMode.Exclusive)
+                                .enableBatching(false)

Review comment:
       I do not.  We could enable it but I wouldn't expect much benefit as we are not exactly optimizing for metadata operation throughput.  For debugging it is a little easier without batching as you will know exactly how many messages are in the internal topics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#issuecomment-780097010


   @eolivelli 
   
   > What happens if the broker is still on an old version that does not support ExclusiveProducer mode ?
   
   If the broker hasn't been updated and does not support exclusive producing mode, the client can still connect and produce but exclusivity is not guaranteed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#issuecomment-766882311


   What happens if the broker is still on an old version that does not support ExclusiveProducer mode ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#discussion_r563441667



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -343,4 +349,47 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String topic,
+                                                                    String producerName) {
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating exclusive producer for topic %s", topic))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = client.newProducer().topic(topic)
+                                .accessMode(ProducerAccessMode.Exclusive)
+                                .enableBatching(false)

Review comment:
       I do not.  We could enable it but I wouldn't expect much benefit as we are not exactly optimizing for metadata operation throughput.  For debugging it is a little easier without batching as you will know exactly how many messages are in the internal topics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9275: WIP Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#issuecomment-780097636


   Thanks.  Makes sense


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng merged pull request #9275: Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
jerrypeng merged pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #9275: Enable Function Workers to use exclusive producer to write to internal topics

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #9275:
URL: https://github.com/apache/pulsar/pull/9275#discussion_r562975085



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -343,4 +349,47 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String topic,
+                                                                    String producerName) {
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating exclusive producer for topic %s", topic))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = client.newProducer().topic(topic)
+                                .accessMode(ProducerAccessMode.Exclusive)

Review comment:
       Would it make sense to use `WaitForExclusive` here? So that the producer instance will stay pending until it becomes the leader?

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
##########
@@ -343,4 +349,47 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String topic,
+                                                                    String producerName) {
+        Actions.Action createProducerAction = Actions.Action.builder()
+                .actionName(String.format("Creating exclusive producer for topic %s", topic))
+                .numRetries(5)
+                .sleepBetweenInvocationsMs(10000)
+                .supplier(() -> {
+                    try {
+                        Producer<byte[]> producer = client.newProducer().topic(topic)
+                                .accessMode(ProducerAccessMode.Exclusive)
+                                .enableBatching(false)

Review comment:
       Do you remember if there was any reason for not using batching here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org