You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/06 23:07:22 UTC
[pulsar] branch master updated: Fix some async method problems at
PersistentTopicsBase. (#6483)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 47ca8e6 Fix some async method problems at PersistentTopicsBase. (#6483)
47ca8e6 is described below
commit 47ca8e64d8f36d89af587a7b9a8865622701e109
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Mar 7 07:07:12 2020 +0800
Fix some async method problems at PersistentTopicsBase. (#6483)
---
.../apache/pulsar/broker/admin/AdminResource.java | 20 +-
.../broker/admin/impl/PersistentTopicsBase.java | 1047 +++++++++++---------
2 files changed, 610 insertions(+), 457 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index a216982..98d370f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -571,16 +571,24 @@ public abstract class AdminResource extends PulsarWebResource {
protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
- validateClusterOwnership(topicName.getCluster());
- // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
- // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
- // producer/consumer
- validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
+ try {
+ validateClusterOwnership(topicName.getCluster());
+ // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+ // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+ // producer/consumer
+ validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
try {
checkConnect(topicName);
} catch (WebApplicationException e) {
- validateAdminAccessForTenant(topicName.getTenant());
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ } catch (Exception ex) {
+ return FutureUtil.failedFuture(ex);
+ }
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f2a95d1..d74017c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -539,50 +539,58 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
- validateAdminAccessForTenant(topicName.getTenant());
-
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
final CompletableFuture<Void> future = new CompletableFuture<>();
-
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- final int numPartitions = partitionMetadata.partitions;
- if (numPartitions > 0) {
- final AtomicInteger count = new AtomicInteger(numPartitions);
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force)
- .whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof NotFoundException) {
- // if the sub-topic is not found, the client might not have called create
- // producer or it might have been deleted earlier, so we ignore the 404 error.
- // For all other exception, we fail the delete partition method even if a single
- // partition is failed to be deleted
- if (log.isDebugEnabled()) {
- log.debug("[{}] Partition not found: {}", clientAppId(),
- topicNamePartition);
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMeta -> {
+ final int numPartitions = partitionMeta.partitions;
+ if (numPartitions > 0) {
+ final AtomicInteger count = new AtomicInteger(numPartitions);
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force)
+ .whenComplete((r, ex) -> {
+ if (ex != null) {
+ if (ex instanceof NotFoundException) {
+ // if the sub-topic is not found, the client might not have called create
+ // producer or it might have been deleted earlier, so we ignore the 404 error.
+ // For all other exception, we fail the delete partition method even if a single
+ // partition is failed to be deleted
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Partition not found: {}", clientAppId(),
+ topicNamePartition);
+ }
+ } else {
+ log.error("[{}] Failed to delete partition {}", clientAppId(),
+ topicNamePartition, ex);
+ future.completeExceptionally(ex);
+ return;
}
} else {
- log.error("[{}] Failed to delete partition {}", clientAppId(),
- topicNamePartition, ex);
- future.completeExceptionally(ex);
- return;
+ log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
}
- } else {
- log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
- }
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
- });
- } catch (Exception e) {
- log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
- future.completeExceptionally(e);
+ if (count.decrementAndGet() == 0) {
+ future.complete(null);
+ }
+ });
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
+ future.completeExceptionally(e);
+ }
}
+ } else {
+ future.complete(null);
}
- } else {
- future.complete(null);
- }
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
future.whenComplete((r, ex) -> {
if (ex != null) {
@@ -593,6 +601,9 @@ public class PersistentTopicsBase extends AdminResource {
} else if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
+ } else if (ex instanceof WebApplicationException) {
+ asyncResponse.resume(ex);
+ return;
} else {
asyncResponse.resume(new RestException(ex));
return;
@@ -602,30 +613,48 @@ public class PersistentTopicsBase extends AdminResource {
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
topicName.getEncodedLocalName());
- try {
- globalZk().delete(path, -1);
- globalZkCache().invalidate(path);
- // Sync data to all quorums and the observers
- zkSync(path);
- log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
- asyncResponse.resume(Response.noContent().build());
- } catch (KeeperException.NoNodeException nne) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned topic does not exist"));
- } catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to delete partitioned topic {}: concurrent modification", clientAppId(),
- topicName);
- asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
- } catch (Exception e) {
- log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
- asyncResponse.resume(new RestException(e));
- }
+
+ globalZk().delete(path, -1, (rc, s, o) -> {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ try {
+ globalZkCache().invalidate(path);
+ globalZk().sync(path, (rc2, s2, ctx) -> {
+ if (KeeperException.Code.OK.intValue() == rc2) {
+ log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+ asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+ }
+ }, null);
+ } catch (Exception e) {
+ log.error("Failed to delete partitioned topic.", e);
+ asyncResponse.resume(new RestException(e));
+ }
+ } else if (KeeperException.Code.NONODE.intValue() == rc) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned topic does not exist"));
+ } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+ log.warn("[{}] Failed to delete partitioned topic {}: concurrent modification", clientAppId(),
+ topicName);
+ asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ } else {
+ log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+ asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+ }
+ }, null);
});
}
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ try {
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
@@ -651,32 +680,43 @@ public class PersistentTopicsBase extends AdminResource {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
+ } else if (th instanceof WebApplicationException) {
+ asyncResponse.resume(th);
} else {
log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception);
asyncResponse.resume(new RestException(exception));
}
- return null;
+ } else {
+ asyncResponse.resume(Response.noContent().build());
}
-
- asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
}
}).exceptionally(t -> {
- Throwable th = t.getCause();
- asyncResponse.resume(new RestException(th));
+ log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, t);
+ if (t instanceof WebApplicationException) {
+ asyncResponse.resume(t);
+ } else {
+ asyncResponse.resume(new RestException(t));
+ }
return null;
});
}
}
private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
-
- Topic topic = getTopicReference(topicName);
+ Topic topic;
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ validateTopicOwnership(topicName, authoritative);
+ topic = getTopicReference(topicName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
topic.close(false).whenComplete((r, ex) -> {
if (ex != null) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex);
@@ -726,66 +766,73 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean authoritative) {
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get subscriptions for topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
} else {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions > 0) {
- try {
- // get the subscriptions only from the 1st partition since all the other partitions will have the same
- // subscriptions
- pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
- .whenComplete((r, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
- topicName, ex.getMessage());
-
- if (ex instanceof PulsarAdminException) {
- PulsarAdminException pae = (PulsarAdminException) ex;
- if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- "Internal topics have not been generated yet"));
- return;
- } else {
- asyncResponse.resume(new RestException(pae));
- return;
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ try {
+ // get the subscriptions only from the 1st partition since all the other partitions will have the same
+ // subscriptions
+ pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
+ .whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
+ topicName, ex.getMessage());
+
+ if (ex instanceof PulsarAdminException) {
+ PulsarAdminException pae = (PulsarAdminException) ex;
+ if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Internal topics have not been generated yet"));
+ return;
+ } else {
+ asyncResponse.resume(new RestException(pae));
+ return;
+ }
+ } else {
+ asyncResponse.resume(new RestException(ex));
+ return;
+ }
}
- } else {
- asyncResponse.resume(new RestException(ex));
- return;
- }
- }
- final List<String> subscriptions = Lists.newArrayList();
- subscriptions.addAll(r);
- asyncResponse.resume(subscriptions);
- return;
- });
- } catch (Exception e) {
- log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
- asyncResponse.resume(e);
- return;
+ final List<String> subscriptions = Lists.newArrayList();
+ subscriptions.addAll(r);
+ asyncResponse.resume(subscriptions);
+ });
+ } catch (Exception e) {
+ log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
+ asyncResponse.resume(e);
+ }
+ } else {
+ internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
}
- } else {
- internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
- }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get subscriptions for topic {}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
- validateAdminOperationOnTopic(authoritative);
- Topic topic = getTopicReference(topicName);
try {
+ validateAdminOperationOnTopic(authoritative);
+ Topic topic = getTopicReference(topicName);
final List<String> subscriptions = Lists.newArrayList();
topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
asyncResponse.resume(subscriptions);
- return;
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
- asyncResponse.resume(new RestException(e));
- return;
+ resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
@@ -810,11 +857,18 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse) {
- validateAdminAccessForTenant(topicName.getTenant());
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ String managedLedger;
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ managedLedger = topicName.getPersistenceNamingEncoding();
+ } catch (Exception e) {
+ log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
}
- String managedLedger = topicName.getPersistenceNamingEncoding();
pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
@@ -832,252 +886,304 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
boolean perPartition, boolean getPreciseBacklog) {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions == 0) {
- throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
- }
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
-
- List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
- topicStatsFutureList
- .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+ return;
+ }
+ PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
+ List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ topicStatsFutureList
+ .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
- FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
- CompletableFuture<TopicStats> statFuture = null;
- for (int i = 0; i < topicStatsFutureList.size(); i++) {
- statFuture = topicStatsFutureList.get(i);
- if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
- try {
- stats.add(statFuture.get());
- if (perPartition) {
- stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+ FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+ CompletableFuture<TopicStats> statFuture = null;
+ for (int i = 0; i < topicStatsFutureList.size(); i++) {
+ statFuture = topicStatsFutureList.get(i);
+ if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+ try {
+ stats.add(statFuture.get());
+ if (perPartition) {
+ stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+ }
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return null;
}
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- return null;
}
}
- }
- if (perPartition && stats.partitions.isEmpty()) {
- String path = ZkAdminPaths.partitionedTopicPath(topicName);
- try {
- boolean zkPathExists = zkPathExists(path);
- if (zkPathExists) {
- stats.partitions.put(topicName.toString(), new TopicStats());
- } else {
- asyncResponse.resume(
- new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+ if (perPartition && stats.partitions.isEmpty()) {
+ String path = ZkAdminPaths.partitionedTopicPath(topicName);
+ try {
+ boolean zkPathExists = zkPathExists(path);
+ if (zkPathExists) {
+ stats.partitions.put(topicName.toString(), new TopicStats());
+ } else {
+ asyncResponse.resume(
+ new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+ return null;
+ }
+ } catch (KeeperException | InterruptedException e) {
+ asyncResponse.resume(new RestException(e));
return null;
}
- } catch (KeeperException | InterruptedException e) {
- asyncResponse.resume(new RestException(e));
- return null;
}
- }
- asyncResponse.resume(stats);
+ asyncResponse.resume(stats);
+ return null;
+ });
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions == 0) {
- throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
- }
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
-
- List<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
- topicStatsFutureList.add(pulsar().getAdminClient().topics()
- .getInternalStatsAsync((topicName.getPartition(i).toString())));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+ return;
+ }
- FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
- CompletableFuture<PersistentTopicInternalStats> statFuture = null;
- for (int i = 0; i < topicStatsFutureList.size(); i++) {
- statFuture = topicStatsFutureList.get(i);
- if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
- try {
- stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- return null;
- }
+ PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
+
+ List<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ topicStatsFutureList.add(pulsar().getAdminClient().topics()
+ .getInternalStatsAsync((topicName.getPartition(i).toString())));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
}
}
- asyncResponse.resume(!stats.partitions.isEmpty() ? stats
- : new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+
+ FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+ CompletableFuture<PersistentTopicInternalStats> statFuture = null;
+ for (int i = 0; i < topicStatsFutureList.size(); i++) {
+ statFuture = topicStatsFutureList.get(i);
+ if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+ try {
+ stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return null;
+ }
+ }
+ }
+ asyncResponse.resume(!stats.partitions.isEmpty() ? stats
+ : new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+ return null;
+ });
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
} else {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics()
- .deleteSubscriptionAsync(topicNamePartition.toString(), subName));
- } catch (Exception e) {
- log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
- e);
- asyncResponse.resume(new RestException(e));
- return;
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+ .deleteSubscriptionAsync(topicNamePartition.toString(), subName));
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
+ e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
}
- }
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
- return null;
- } else if (t instanceof PreconditionFailedException) {
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Subscription has active connected consumers"));
- return null;
- } else {
- log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t);
- asyncResponse.resume(new RestException(t));
- return null;
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return null;
+ } else if (t instanceof PreconditionFailedException) {
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+ "Subscription has active connected consumers"));
+ return null;
+ } else {
+ log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
}
- }
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
- } else {
- internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
- }
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+ internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
- validateAdminAccessForSubscriber(subName, authoritative);
- Topic topic = getTopicReference(topicName);
try {
+ validateAdminAccessForSubscriber(subName, authoritative);
+ Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
- checkNotNull(sub);
+ if (sub == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
sub.delete().get();
log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
- Throwable t = e.getCause();
- if (e instanceof NullPointerException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
- } else if (t instanceof SubscriptionBusyException) {
+ log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
+ if (e.getCause() instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
+ } else if (e instanceof WebApplicationException) {
+ asyncResponse.resume(e);
} else {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e);
- asyncResponse.resume(new RestException(t));
+ asyncResponse.resume(new RestException(e));
}
}
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
} else {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
- subName));
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e);
- asyncResponse.resume(new RestException(e));
- return;
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
+ subName));
+ } catch (Exception e) {
+ log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
}
- }
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
- return null;
- } else {
- log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t);
- asyncResponse.resume(new RestException(t));
- return null;
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return null;
+ } else {
+ log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
}
- }
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
- } else {
- internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
- }
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+ internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
- validateAdminAccessForSubscriber(subName, authoritative);
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
- BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
- if (ex != null) {
- asyncResponse.resume(new RestException(ex));
- log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex);
- } else {
- asyncResponse.resume(Response.noContent().build());
- log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
- }
- };
try {
+ validateAdminAccessForSubscriber(subName, authoritative);
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
+ if (ex != null) {
+ asyncResponse.resume(new RestException(ex));
+ log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex);
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
+ }
+ };
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
- checkNotNull(repl);
+ if (repl == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
repl.clearBacklog().whenComplete(biConsumer);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
- checkNotNull(sub);
+ if (sub == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
sub.clearBacklog().whenComplete(biConsumer);
}
} catch (Exception e) {
- if (e instanceof NullPointerException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
- } else {
- asyncResponse.resume(new RestException(e));
- }
+ log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
@@ -1115,54 +1221,72 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
} else {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
- // expire messages for each partition topic
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
- topicNamePartition.toString(), expireTimeInSeconds));
- } catch (Exception e) {
- log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
- topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
+ // expire messages for each partition topic
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
+ topicNamePartition.toString(), expireTimeInSeconds));
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
+ topicNamePartition, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
}
- }
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- Throwable t = exception.getCause();
- log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
- topicName, t);
- asyncResponse.resume(new RestException(t));
- return null;
- }
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
+ topicName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
- } else {
- internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
- }
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+ internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
// validate ownership and redirect if current broker is not owner
- validateAdminOperationOnTopic(authoritative);
+ PersistentTopic topic;
+ try {
+ validateAdminOperationOnTopic(authoritative);
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ topic = (PersistentTopic) getTopicReference(topicName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
final AtomicReference<Throwable> exception = new AtomicReference<>();
topic.getReplicators().forEach((subName, replicator) -> {
@@ -1198,111 +1322,124 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp,
boolean authoritative) {
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
} else {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- final int numPartitions = partitionMetadata.partitions;
- if (numPartitions > 0) {
- final CompletableFuture<Void> future = new CompletableFuture<>();
- final AtomicInteger count = new AtomicInteger(numPartitions);
- final AtomicInteger failureCount = new AtomicInteger(0);
- final AtomicReference<Throwable> partitionException = new AtomicReference<>();
-
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- pulsar().getAdminClient().topics()
- .resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PreconditionFailedException) {
- // throw the last exception if all partitions get this error
- // any other exception on partition is reported back to user
- failureCount.incrementAndGet();
- partitionException.set(ex);
- } else {
- log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
- clientAppId(), topicNamePartition, subName, timestamp, ex);
- future.completeExceptionally(ex);
- return null;
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ final int numPartitions = partitionMetadata.partitions;
+ if (numPartitions > 0) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ final AtomicInteger count = new AtomicInteger(numPartitions);
+ final AtomicInteger failureCount = new AtomicInteger(0);
+ final AtomicReference<Throwable> partitionException = new AtomicReference<>();
+
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ pulsar().getAdminClient().topics()
+ .resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
+ if (ex != null) {
+ if (ex instanceof PreconditionFailedException) {
+ // throw the last exception if all partitions get this error
+ // any other exception on partition is reported back to user
+ failureCount.incrementAndGet();
+ partitionException.set(ex);
+ } else {
+ log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+ clientAppId(), topicNamePartition, subName, timestamp, ex);
+ future.completeExceptionally(ex);
+ return null;
+ }
}
- }
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
+ if (count.decrementAndGet() == 0) {
+ future.complete(null);
+ }
- return null;
- });
- } catch (Exception e) {
- log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
- topicNamePartition, subName, timestamp, e);
- future.completeExceptionally(e);
+ return null;
+ });
+ } catch (Exception e) {
+ log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
+ topicNamePartition, subName, timestamp, e);
+ future.completeExceptionally(e);
+ }
}
- }
- future.whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException((PulsarAdminException) ex));
- return;
- } else {
- asyncResponse.resume(new RestException(ex));
- return;
+ future.whenComplete((r, ex) -> {
+ if (ex != null) {
+ if (ex instanceof PulsarAdminException) {
+ asyncResponse.resume(new RestException((PulsarAdminException) ex));
+ return;
+ } else {
+ asyncResponse.resume(new RestException(ex));
+ return;
+ }
}
- }
- // report an error to user if unable to reset for all partitions
- if (failureCount.get() == numPartitions) {
- log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
- subName, timestamp, partitionException.get());
- asyncResponse.resume(
- new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage()));
- return;
- } else if (failureCount.get() > 0) {
- log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(),
- topicName, subName, timestamp, partitionException.get());
- }
+ // report an error to user if unable to reset for all partitions
+ if (failureCount.get() == numPartitions) {
+ log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
+ subName, timestamp, partitionException.get());
+ asyncResponse.resume(
+ new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage()));
+ return;
+ } else if (failureCount.get() > 0) {
+ log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(),
+ topicName, subName, timestamp, partitionException.get());
+ }
- asyncResponse.resume(Response.noContent().build());
- });
- } else {
- internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
- }
+ asyncResponse.resume(Response.noContent().build());
+ });
+ } else {
+ internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
boolean authoritative) {
- validateAdminAccessForSubscriber(subName, authoritative);
- log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
- timestamp);
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
- if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
- return;
- }
try {
+ validateAdminAccessForSubscriber(subName, authoritative);
+ log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
+ timestamp);
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ if (topic == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ return;
+ }
PersistentSubscription sub = topic.getSubscription(subName);
- checkNotNull(sub);
+ if (sub == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
sub.resetCursor(timestamp).get();
log.info("[{}] [{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
timestamp);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
- Throwable t = e.getCause();
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
subName, timestamp, e);
- if (e instanceof NullPointerException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
- } else if (e instanceof NotAllowedException) {
+ if (e instanceof NotAllowedException) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
- } else if (t instanceof SubscriptionInvalidCursorPosition) {
+ } else if (e instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Unable to find position for timestamp specified -" + t.getMessage()));
+ "Unable to find position for timestamp specified -" + e.getMessage()));
+ } else if (e instanceof WebApplicationException) {
+ asyncResponse.resume(e);
} else {
asyncResponse.resume(new RestException(e));
}
@@ -1312,7 +1449,13 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl messageId, boolean authoritative, boolean replicated) {
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
}
final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.earliest : messageId;
log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
@@ -1321,86 +1464,88 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isPartitioned()) {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
} else {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- final int numPartitions = partitionMetadata.partitions;
- if (numPartitions > 0) {
- final CompletableFuture<Void> future = new CompletableFuture<>();
- final AtomicInteger count = new AtomicInteger(numPartitions);
- final AtomicInteger failureCount = new AtomicInteger(0);
- final AtomicReference<Throwable> partitionException = new AtomicReference<>();
-
- // Create the subscription on each partition
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- pulsar().getAdminClient().topics()
- .createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId)
- .handle((r, ex) -> {
- if (ex != null) {
- // fail the operation on unknown exception or if all the partitioned failed due to
- // subscription-already-exist
- if (failureCount.incrementAndGet() == numPartitions
- || !(ex instanceof PulsarAdminException.ConflictException)) {
- partitionException.set(ex);
- }
- }
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ final int numPartitions = partitionMetadata.partitions;
+ if (numPartitions > 0) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ final AtomicInteger count = new AtomicInteger(numPartitions);
+ final AtomicInteger failureCount = new AtomicInteger(0);
+ final AtomicReference<Throwable> partitionException = new AtomicReference<>();
+
+ // Create the subscription on each partition
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ pulsar().getAdminClient().topics()
+ .createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId)
+ .handle((r, ex) -> {
+ if (ex != null) {
+ // fail the operation on unknown exception or if all the partitioned failed due to
+ // subscription-already-exist
+ if (failureCount.incrementAndGet() == numPartitions
+ || !(ex instanceof PulsarAdminException.ConflictException)) {
+ partitionException.set(ex);
+ }
+ }
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
+ if (count.decrementAndGet() == 0) {
+ future.complete(null);
+ }
- return null;
- });
- } catch (Exception e) {
- log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(),
- topicNamePartition, subscriptionName, targetMessageId, e);
- future.completeExceptionally(e);
+ return null;
+ });
+ } catch (Exception e) {
+ log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(),
+ topicNamePartition, subscriptionName, targetMessageId, e);
+ future.completeExceptionally(e);
+ }
}
- }
- future.whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException((PulsarAdminException) ex));
- return;
- } else {
- asyncResponse.resume(new RestException(ex));
- return;
+ future.whenComplete((r, ex) -> {
+ if (ex != null) {
+ if (ex instanceof PulsarAdminException) {
+ asyncResponse.resume(new RestException((PulsarAdminException) ex));
+ return;
+ } else {
+ asyncResponse.resume(new RestException(ex));
+ return;
+ }
}
- }
- if (partitionException.get() != null) {
- log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
- subscriptionName, targetMessageId, partitionException.get());
- if (partitionException.get() instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get()));
- return;
- } else {
- asyncResponse.resume(new RestException(partitionException.get()));
- return;
+ if (partitionException.get() != null) {
+ log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
+ subscriptionName, targetMessageId, partitionException.get());
+ if (partitionException.get() instanceof PulsarAdminException) {
+ asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get()));
+ return;
+ } else {
+ asyncResponse.resume(new RestException(partitionException.get()));
+ return;
+ }
}
- }
- asyncResponse.resume(Response.noContent().build());
- });
- } else {
- internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
- }
+ asyncResponse.resume(Response.noContent().build());
+ });
+ } else {
+ internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
- validateAdminAccessForSubscriber(subscriptionName, authoritative);
-
- PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName);
-
- if (topic.getSubscriptions().containsKey(subscriptionName)) {
- asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic"));
- return;
- }
-
try {
+ validateAdminAccessForSubscriber(subscriptionName, authoritative);
+ PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName);
+ if (topic.getSubscriptions().containsKey(subscriptionName)) {
+ asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic"));
+ return;
+ }
PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName, InitialPosition.Latest, replicated).get();
// Mark the cursor as "inactive" as it was created without a real consumer connected
@@ -1414,10 +1559,10 @@ public class PersistentTopicsBase extends AdminResource {
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
- return;
+ } else if (e instanceof WebApplicationException) {
+ asyncResponse.resume(e);
} else {
asyncResponse.resume(new RestException(e));
- return;
}
}