You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/04 03:52:01 UTC
[pulsar] branch branch-2.6 updated (7e59203 -> 67d3d98)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.
from 7e59203 passes on authenticationData (#7694)
new e40719f Close the previous reader of the healthcheck topic (#7724)
new 67d3d98 Change some WebApplicationException log level to debug (#7725)
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../pulsar/broker/admin/impl/BrokersBase.java | 9 +++-
.../broker/admin/impl/PersistentTopicsBase.java | 50 +++++++++++++++++++---
2 files changed, 51 insertions(+), 8 deletions(-)
[pulsar] 02/02: Change some WebApplicationException log level to
debug (#7725)
Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 67d3d98653335ded5e4bfa306e9dc573e210526a
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Mon Aug 3 23:59:55 2020 +0800
Change some WebApplicationException log level to debug (#7725)
### Motivation
Some user may face the following `Temporary Redirect` issue when the request topic not owned by the current broker:
```
19:21:48.215 [pulsar-web-42-5] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin] Failed to get list of subscriptions for persistent://default_tenant/default_namespace/default_topic-partition-0
javax.ws.rs.WebApplicationException: HTTP 307 Temporary Redirect
at org.apache.pulsar.broker.web.PulsarWebResource.validateTopicOwnership(PulsarWebResource.java:599)
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.validateReadOperationOnTopic(PersistentTopicsBase.java:245)
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGetSubscriptionsForNonPartitionedTopic(PersistentTopicsBase.java:874)
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGetSubscriptions(PersistentTopicsBase.java:825)
at org.apache.pulsar.broker.admin.v2.PersistentTopics.getSubscriptions(PersistentTopics.java:461)
09:41:45.485 [pulsar-web-42-7] WARN org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin] [persistent://public/default/test-partition-1] Failed to create subscription consumer-test at message id -1:-1:-1
javax.ws.rs.WebApplicationException: HTTP 307 Temporary Redirect
at org.apache.pulsar.broker.web.PulsarWebResource.validateTopicOwnership(PulsarWebResource.java:599)
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.validateAdminAccessForSubscriber(PersistentTopicsBase.java:283)
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:1752)
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscription(PersistentTopicsBase.java:1673)
at org.apache.pulsar.broker.admin.v2.PersistentTopics.createSubscription(PersistentTopics.java:816)
```
As discussed in https://github.com/apache/pulsar/issues/7189, it is better to use debug level when this happens.
### Modifications
Change some `WebApplicationException` log level from error/warn to debug in `PersistentTopicsBase`.
(cherry picked from commit 8061c547327b46700c8e66f2d829e258f21a292c)
---
.../broker/admin/impl/PersistentTopicsBase.java | 50 +++++++++++++++++++---
1 file changed, 44 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index afa4fe7..b3f839b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -595,6 +595,13 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
try {
validateWriteOperationOnTopic(authoritative);
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to delete partitioned topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
+ return;
} catch (Exception e) {
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -876,6 +883,13 @@ public class PersistentTopicsBase extends AdminResource {
final List<String> subscriptions = Lists.newArrayList();
topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
asyncResponse.resume(subscriptions);
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
+ return;
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1204,11 +1218,15 @@ public class PersistentTopicsBase extends AdminResource {
log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
- log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
if (e.getCause() instanceof SubscriptionBusyException) {
+ log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
} else if (e instanceof WebApplicationException) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, e);
+ }
asyncResponse.resume(e);
} else {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e);
@@ -1288,8 +1306,11 @@ public class PersistentTopicsBase extends AdminResource {
log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
- log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
if (e instanceof WebApplicationException) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to delete subscription forcefully from topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, e);
+ }
asyncResponse.resume(e);
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e);
@@ -1384,6 +1405,12 @@ public class PersistentTopicsBase extends AdminResource {
}
sub.clearBacklog().whenComplete(biConsumer);
}
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to skip all messages for subscription on topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
} catch (Exception e) {
log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1485,6 +1512,13 @@ public class PersistentTopicsBase extends AdminResource {
validateWriteOperationOnTopic(authoritative);
topic = (PersistentTopic) getTopicReference(topicName);
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to expire messages for all subscription on topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
+ return;
} catch (Exception e) {
log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
@@ -1761,12 +1795,14 @@ public class PersistentTopicsBase extends AdminResource {
.get();
} catch (Throwable e) {
Throwable t = e.getCause();
- log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
- subscriptionName, targetMessageId, e);
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
} else if (e instanceof WebApplicationException) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Failed to create subscription {} at message id {}, redirecting to other brokers.", clientAppId(), topicName,
+ subscriptionName, targetMessageId, e);
+ }
asyncResponse.resume(e);
} else {
asyncResponse.resume(new RestException(e));
@@ -2625,8 +2661,10 @@ public class PersistentTopicsBase extends AdminResource {
validateReadOperationOnTopic(authoritative);
topic = getTopicReference(topicName);
} catch (WebApplicationException wae) {
- log.debug("[{}] Failed to get last messageId {}, redirecting to other brokers.",
- clientAppId(), topicName, wae);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to get last messageId {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
resumeAsyncResponseExceptionally(asyncResponse, wae);
return;
} catch (Exception e) {
[pulsar] 01/02: Close the previous reader of the healthcheck topic
(#7724)
Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e40719f844cf51c0d0c8eb2e330672dc26214c1d
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Aug 3 19:02:52 2020 +0800
Close the previous reader of the healthcheck topic (#7724)
(cherry picked from commit ca98a89c62179c710b9190b3b1fd8e04e3b597c4)
---
.../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 072e91c..e78427c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -45,6 +46,8 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -261,9 +264,11 @@ public class BrokersBase extends AdminResource {
PulsarClient client = pulsar().getClient();
String messageStr = UUID.randomUUID().toString();
- // create non-partitioned topic manually
+ // create non-partitioned topic manually and close the previous reader if present.
try {
- pulsar().getBrokerService().getTopic(topic, true).get();
+ pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
+ t.getSubscriptions().values().forEach(Subscription::deleteForcefully);
+ });
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;