You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/04 03:52:03 UTC

[pulsar] 02/02: Change some WebApplicationException log level to debug (#7725)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 67d3d98653335ded5e4bfa306e9dc573e210526a
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Mon Aug 3 23:59:55 2020 +0800

    Change some WebApplicationException log level to debug (#7725)
    
    ### Motivation
    Some user may face the following `Temporary Redirect` issue when the request topic not owned by the current broker:
    ```
    19:21:48.215 [pulsar-web-42-5] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin] Failed to get list of subscriptions for persistent://default_tenant/default_namespace/default_topic-partition-0
    javax.ws.rs.WebApplicationException: HTTP 307 Temporary Redirect
    	at org.apache.pulsar.broker.web.PulsarWebResource.validateTopicOwnership(PulsarWebResource.java:599)
    	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.validateReadOperationOnTopic(PersistentTopicsBase.java:245)
    	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGetSubscriptionsForNonPartitionedTopic(PersistentTopicsBase.java:874)
    	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGetSubscriptions(PersistentTopicsBase.java:825)
    	at org.apache.pulsar.broker.admin.v2.PersistentTopics.getSubscriptions(PersistentTopics.java:461)
    
    09:41:45.485 [pulsar-web-42-7] WARN  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin] [persistent://public/default/test-partition-1] Failed to create subscription consumer-test at message id -1:-1:-1
    javax.ws.rs.WebApplicationException: HTTP 307 Temporary Redirect
            at org.apache.pulsar.broker.web.PulsarWebResource.validateTopicOwnership(PulsarWebResource.java:599)
            at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.validateAdminAccessForSubscriber(PersistentTopicsBase.java:283)
            at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:1752)
            at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscription(PersistentTopicsBase.java:1673)
            at org.apache.pulsar.broker.admin.v2.PersistentTopics.createSubscription(PersistentTopics.java:816)
    ```
     As discussed in https://github.com/apache/pulsar/issues/7189, it is better to use debug level when this happens.
    
    ### Modifications
    
    Change some `WebApplicationException` log level from error/warn to debug in `PersistentTopicsBase`.
    
    (cherry picked from commit 8061c547327b46700c8e66f2d829e258f21a292c)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 50 +++++++++++++++++++---
 1 file changed, 44 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index afa4fe7..b3f839b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -595,6 +595,13 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
         try {
             validateWriteOperationOnTopic(authoritative);
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to delete partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+            return;
         } catch (Exception e) {
             log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -876,6 +883,13 @@ public class PersistentTopicsBase extends AdminResource {
             final List<String> subscriptions = Lists.newArrayList();
             topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
             asyncResponse.resume(subscriptions);
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+            return;
         } catch (Exception e) {
             log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1204,11 +1218,15 @@ public class PersistentTopicsBase extends AdminResource {
             log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
             asyncResponse.resume(Response.noContent().build());
         } catch (Exception e) {
-            log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
             if (e.getCause() instanceof SubscriptionBusyException) {
+                log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
                 asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
                     "Subscription has active connected consumers"));
             } else if (e instanceof WebApplicationException) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.",
+                            clientAppId(), topicName, e);
+                }
                 asyncResponse.resume(e);
             } else {
                 log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e);
@@ -1288,8 +1306,11 @@ public class PersistentTopicsBase extends AdminResource {
             log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
             asyncResponse.resume(Response.noContent().build());
         } catch (Exception e) {
-            log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
             if (e instanceof WebApplicationException) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Failed to delete subscription forcefully from topic {}, redirecting to other brokers.",
+                            clientAppId(), topicName, e);
+                }
                 asyncResponse.resume(e);
             } else {
                 log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e);
@@ -1384,6 +1405,12 @@ public class PersistentTopicsBase extends AdminResource {
                 }
                 sub.clearBacklog().whenComplete(biConsumer);
             }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to skip all messages for subscription on topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
         } catch (Exception e) {
             log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1485,6 +1512,13 @@ public class PersistentTopicsBase extends AdminResource {
             validateWriteOperationOnTopic(authoritative);
 
             topic = (PersistentTopic) getTopicReference(topicName);
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to expire messages for all subscription on topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+            return;
         } catch (Exception e) {
             log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1761,12 +1795,14 @@ public class PersistentTopicsBase extends AdminResource {
                 .get();
         } catch (Throwable e) {
             Throwable t = e.getCause();
-            log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
-                subscriptionName, targetMessageId, e);
             if (t instanceof SubscriptionInvalidCursorPosition) {
                 asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
                     "Unable to find position for position specified: " + t.getMessage()));
             } else if (e instanceof WebApplicationException) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Failed to create subscription {} at message id {}, redirecting to other brokers.", clientAppId(), topicName,
+                            subscriptionName, targetMessageId, e);
+                }
                 asyncResponse.resume(e);
             } else {
                 asyncResponse.resume(new RestException(e));
@@ -2625,8 +2661,10 @@ public class PersistentTopicsBase extends AdminResource {
             validateReadOperationOnTopic(authoritative);
             topic = getTopicReference(topicName);
         } catch (WebApplicationException wae) {
-            log.debug("[{}] Failed to get last messageId {}, redirecting to other brokers.",
-                    clientAppId(), topicName, wae);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get last messageId {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
             resumeAsyncResponseExceptionally(asyncResponse, wae);
             return;
         } catch (Exception e) {