You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/04 03:52:01 UTC

[pulsar] branch branch-2.6 updated (7e59203 -> 67d3d98)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 7e59203  passes on authenticationData (#7694)
     new e40719f  Close the previous reader of the healthcheck topic (#7724)
     new 67d3d98  Change some WebApplicationException log level to debug (#7725)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/broker/admin/impl/BrokersBase.java      |  9 +++-
 .../broker/admin/impl/PersistentTopicsBase.java    | 50 +++++++++++++++++++---
 2 files changed, 51 insertions(+), 8 deletions(-)


[pulsar] 02/02: Change some WebApplicationException log level to debug (#7725)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 67d3d98653335ded5e4bfa306e9dc573e210526a
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Mon Aug 3 23:59:55 2020 +0800

    Change some WebApplicationException log level to debug (#7725)
    
    ### Motivation
    Some user may face the following `Temporary Redirect` issue when the request topic not owned by the current broker:
    ```
    19:21:48.215 [pulsar-web-42-5] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin] Failed to get list of subscriptions for persistent://default_tenant/default_namespace/default_topic-partition-0
    javax.ws.rs.WebApplicationException: HTTP 307 Temporary Redirect
    	at org.apache.pulsar.broker.web.PulsarWebResource.validateTopicOwnership(PulsarWebResource.java:599)
    	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.validateReadOperationOnTopic(PersistentTopicsBase.java:245)
    	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGetSubscriptionsForNonPartitionedTopic(PersistentTopicsBase.java:874)
    	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGetSubscriptions(PersistentTopicsBase.java:825)
    	at org.apache.pulsar.broker.admin.v2.PersistentTopics.getSubscriptions(PersistentTopics.java:461)
    
    09:41:45.485 [pulsar-web-42-7] WARN  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin] [persistent://public/default/test-partition-1] Failed to create subscription consumer-test at message id -1:-1:-1
    javax.ws.rs.WebApplicationException: HTTP 307 Temporary Redirect
            at org.apache.pulsar.broker.web.PulsarWebResource.validateTopicOwnership(PulsarWebResource.java:599)
            at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.validateAdminAccessForSubscriber(PersistentTopicsBase.java:283)
            at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:1752)
            at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscription(PersistentTopicsBase.java:1673)
            at org.apache.pulsar.broker.admin.v2.PersistentTopics.createSubscription(PersistentTopics.java:816)
    ```
     As discussed in https://github.com/apache/pulsar/issues/7189, it is better to use debug level when this happens.
    
    ### Modifications
    
    Change some `WebApplicationException` log level from error/warn to debug in `PersistentTopicsBase`.
    
    (cherry picked from commit 8061c547327b46700c8e66f2d829e258f21a292c)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 50 +++++++++++++++++++---
 1 file changed, 44 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index afa4fe7..b3f839b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -595,6 +595,13 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
         try {
             validateWriteOperationOnTopic(authoritative);
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to delete partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+            return;
         } catch (Exception e) {
             log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -876,6 +883,13 @@ public class PersistentTopicsBase extends AdminResource {
             final List<String> subscriptions = Lists.newArrayList();
             topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
             asyncResponse.resume(subscriptions);
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+            return;
         } catch (Exception e) {
             log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1204,11 +1218,15 @@ public class PersistentTopicsBase extends AdminResource {
             log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
             asyncResponse.resume(Response.noContent().build());
         } catch (Exception e) {
-            log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
             if (e.getCause() instanceof SubscriptionBusyException) {
+                log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
                 asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
                     "Subscription has active connected consumers"));
             } else if (e instanceof WebApplicationException) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.",
+                            clientAppId(), topicName, e);
+                }
                 asyncResponse.resume(e);
             } else {
                 log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e);
@@ -1288,8 +1306,11 @@ public class PersistentTopicsBase extends AdminResource {
             log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
             asyncResponse.resume(Response.noContent().build());
         } catch (Exception e) {
-            log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
             if (e instanceof WebApplicationException) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Failed to delete subscription forcefully from topic {}, redirecting to other brokers.",
+                            clientAppId(), topicName, e);
+                }
                 asyncResponse.resume(e);
             } else {
                 log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e);
@@ -1384,6 +1405,12 @@ public class PersistentTopicsBase extends AdminResource {
                 }
                 sub.clearBacklog().whenComplete(biConsumer);
             }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to skip all messages for subscription on topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
         } catch (Exception e) {
             log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1485,6 +1512,13 @@ public class PersistentTopicsBase extends AdminResource {
             validateWriteOperationOnTopic(authoritative);
 
             topic = (PersistentTopic) getTopicReference(topicName);
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to expire messages for all subscription on topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+            return;
         } catch (Exception e) {
             log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1761,12 +1795,14 @@ public class PersistentTopicsBase extends AdminResource {
                 .get();
         } catch (Throwable e) {
             Throwable t = e.getCause();
-            log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
-                subscriptionName, targetMessageId, e);
             if (t instanceof SubscriptionInvalidCursorPosition) {
                 asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
                     "Unable to find position for position specified: " + t.getMessage()));
             } else if (e instanceof WebApplicationException) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Failed to create subscription {} at message id {}, redirecting to other brokers.", clientAppId(), topicName,
+                            subscriptionName, targetMessageId, e);
+                }
                 asyncResponse.resume(e);
             } else {
                 asyncResponse.resume(new RestException(e));
@@ -2625,8 +2661,10 @@ public class PersistentTopicsBase extends AdminResource {
             validateReadOperationOnTopic(authoritative);
             topic = getTopicReference(topicName);
         } catch (WebApplicationException wae) {
-            log.debug("[{}] Failed to get last messageId {}, redirecting to other brokers.",
-                    clientAppId(), topicName, wae);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get last messageId {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
             resumeAsyncResponseExceptionally(asyncResponse, wae);
             return;
         } catch (Exception e) {


[pulsar] 01/02: Close the previous reader of the healthcheck topic (#7724)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e40719f844cf51c0d0c8eb2e330672dc26214c1d
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Aug 3 19:02:52 2020 +0800

    Close the previous reader of the healthcheck topic (#7724)
    
    
    (cherry picked from commit ca98a89c62179c710b9190b3b1fd8e04e3b597c4)
---
 .../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java    | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 072e91c..e78427c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -45,6 +46,8 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -261,9 +264,11 @@ public class BrokersBase extends AdminResource {
         PulsarClient client = pulsar().getClient();
 
         String messageStr = UUID.randomUUID().toString();
-        // create non-partitioned topic manually
+        // create non-partitioned topic manually and close the previous reader if present.
         try {
-            pulsar().getBrokerService().getTopic(topic, true).get();
+            pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
+                t.getSubscriptions().values().forEach(Subscription::deleteForcefully);
+            });
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
             return;