You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/08 12:36:30 UTC

[GitHub] [pulsar] 315157973 opened a new pull request #11597: Use `get` instead of `join` to avoid getting stuck

315157973 opened a new pull request #11597:
URL: https://github.com/apache/pulsar/pull/11597


   ### Motivation
    join() will not time out, when network jitter occurs, it is easy to cause the thread to get stuck forever
   
   
   ### Modifications
   Use get(timeout) instead of join() to avoid getting stuck


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #11597: Use `get` instead of `join` to avoid getting stuck

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #11597:
URL: https://github.com/apache/pulsar/pull/11597#discussion_r684788645



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
##########
@@ -258,7 +261,12 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
         CompletableFuture<OwnedBundle> future = ownedBundlesCache.getIfPresent(bundle);
 
         if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
-            return future.join();
+            try {
+                return future.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.error("Get owned bundle failed ", e);
+                return null;

Review comment:
       Nice view, done

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
##########
@@ -122,11 +122,17 @@ public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
             //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
             String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
             String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) {
-                retryLetterTopic = oldRetryLetterTopic;
-            }
-            if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) {
-                deadLetterTopic = oldDeadLetterTopic;
+            try {
+                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+                    retryLetterTopic = oldRetryLetterTopic;
+                }
+                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+                    deadLetterTopic = oldDeadLetterTopic;
+                }
+            } catch (Exception e) {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11597: Use `get` instead of `join` to avoid getting stuck

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11597:
URL: https://github.com/apache/pulsar/pull/11597#discussion_r684872055



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
##########
@@ -122,11 +124,17 @@ public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
             //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
             String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
             String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) {
-                retryLetterTopic = oldRetryLetterTopic;
-            }
-            if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) {
-                deadLetterTopic = oldDeadLetterTopic;
+            try {
+                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+                    retryLetterTopic = oldRetryLetterTopic;
+                }
+                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+                    deadLetterTopic = oldDeadLetterTopic;
+                }
+            } catch (InterruptedException | TimeoutException | ExecutionException e) {
+                return FutureUtil.failedFuture(e);

Review comment:
       Should return FutureUtil.failedFuture(e.getCause()) if e is ExecutionException?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
##########
@@ -297,7 +298,14 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
     }
 
     private Topic getTopicReference(TopicName topicName) {
-        return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
+        try {
+            return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+                    .get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e.getCause());

Review comment:
       Should throw RestException here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #11597: Use `get` instead of `join` to avoid getting stuck

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #11597:
URL: https://github.com/apache/pulsar/pull/11597#issuecomment-897325098


   This PR depend on #11265 , which is a feature. we move this pr to 2.8.2 first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #11597: Use `get` instead of `join` to avoid getting stuck

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #11597:
URL: https://github.com/apache/pulsar/pull/11597


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11597: Use `get` instead of `join` to avoid getting stuck

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11597:
URL: https://github.com/apache/pulsar/pull/11597#discussion_r684787679



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
##########
@@ -258,7 +261,12 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
         CompletableFuture<OwnedBundle> future = ownedBundlesCache.getIfPresent(bundle);
 
         if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
-            return future.join();
+            try {
+                return future.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.error("Get owned bundle failed ", e);
+                return null;

Review comment:
       This is a behaviour change.
   We should fail and not return null

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
##########
@@ -122,11 +122,17 @@ public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
             //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
             String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
             String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) {
-                retryLetterTopic = oldRetryLetterTopic;
-            }
-            if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) {
-                deadLetterTopic = oldDeadLetterTopic;
+            try {
+                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+                    retryLetterTopic = oldRetryLetterTopic;
+                }
+                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+                    deadLetterTopic = oldDeadLetterTopic;
+                }
+            } catch (Exception e) {

Review comment:
       Can we catch the specific exception? This is a new 'catch' block




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #11597: Use `get` instead of `join` to avoid getting stuck

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #11597:
URL: https://github.com/apache/pulsar/pull/11597#discussion_r684788645



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
##########
@@ -258,7 +261,12 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
         CompletableFuture<OwnedBundle> future = ownedBundlesCache.getIfPresent(bundle);
 
         if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
-            return future.join();
+            try {
+                return future.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.error("Get owned bundle failed ", e);
+                return null;

Review comment:
       Nice review, done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #11597: Use `get` instead of `join` to avoid getting stuck

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #11597:
URL: https://github.com/apache/pulsar/pull/11597#discussion_r684873832



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
##########
@@ -122,11 +124,17 @@ public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
             //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
             String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
             String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) {
-                retryLetterTopic = oldRetryLetterTopic;
-            }
-            if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) {
-                deadLetterTopic = oldDeadLetterTopic;
+            try {
+                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+                    retryLetterTopic = oldRetryLetterTopic;
+                }
+                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+                    deadLetterTopic = oldDeadLetterTopic;
+                }
+            } catch (InterruptedException | TimeoutException | ExecutionException e) {
+                return FutureUtil.failedFuture(e);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org