You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:17 UTC

[pulsar] 23/27: Expire message by position. (#9514)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 00459dbc02479dcc42c17a27862884b6ee158c62
Author: Marvin Cai <zx...@streamnative.io>
AuthorDate: Tue Feb 9 16:16:29 2021 -0800

    Expire message by position. (#9514)
    
    Fixes #2736
    
    Add ability to expire message for subscription by position to admin client and admin rest API.
    
    Update PersistentMessageExpiryMonitor to able to expire message by position and expose to admin client and admin rest api.
    
    This change added tests and can be verified as follows:
     - Added unit test for expire message by position in PersistentMessageFinderTest.
     - Added test for expire message by position in admin client/admin rest api.
    
    (cherry picked from commit 301d42ce04ceb20797cc2a60267e6ec39ff05e95)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 253 ++++++++++++++-------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  41 +++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  42 +++-
 .../apache/pulsar/broker/service/Subscription.java |   2 +
 .../nonpersistent/NonPersistentSubscription.java   |   6 +
 .../persistent/PersistentMessageExpiryMonitor.java |  27 +++
 .../service/persistent/PersistentReplicator.java   |  12 +
 .../service/persistent/PersistentSubscription.java |   5 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  95 ++++++--
 .../pulsar/broker/service/BrokerServiceTest.java   |   1 +
 .../service/PersistentMessageFinderTest.java       |  86 ++++++-
 .../api/AuthorizationProducerConsumerTest.java     |   2 +
 .../org/apache/pulsar/client/admin/Topics.java     |  34 +++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  27 +++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   5 +
 .../org/apache/pulsar/admin/cli/CliCommand.java    |   6 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  31 ++-
 .../apache/pulsar/client/impl/ResetCursorData.java |   2 +
 18 files changed, 568 insertions(+), 109 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d957797..4c4db2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1717,7 +1717,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         topic.getReplicators().forEach((subName, replicator) -> {
             try {
-                internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
             } catch (Throwable t) {
                 exception.set(t);
             }
@@ -1725,7 +1725,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         topic.getSubscriptions().forEach((subName, subscriber) -> {
             try {
-                internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
             } catch (Throwable t) {
                 exception.set(t);
             }
@@ -2057,77 +2057,9 @@ public class PersistentTopicsBase extends AdminResource {
                     return;
                 }
                 CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
-                if (batchIndex >= 0) {
-                    try {
-                        ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
-                        ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(), messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
-                            @Override
-                            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                                // Since we can't read the message from the storage layer, it might be an already delete message ID or an invalid message ID
-                                // We should fall back to non batch index seek.
-                                batchSizeFuture.complete(0);
-                            }
-
-                            @Override
-                            public void readEntryComplete(Entry entry, Object ctx) {
-                                try {
-                                    try {
-                                        if (entry == null) {
-                                            batchSizeFuture.complete(0);
-                                        } else {
-                                            MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
-                                            batchSizeFuture.complete(metadata.getNumMessagesInBatch());
-                                        }
-                                    } catch (Exception e) {
-                                        batchSizeFuture.completeExceptionally(new RestException(e));
-                                    }
-                                } finally {
-                                    if (entry != null) {
-                                        entry.release();
-                                    }
-                                }
-                            }
-                        }, null);
-                    } catch (NullPointerException npe) {
-                        batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
-                    } catch (Exception exception) {
-                        log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
-                                clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception);
-                        batchSizeFuture.completeExceptionally(new RestException(exception));
-                    }
-                } else {
-                    batchSizeFuture.complete(0);
-                }
+                getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
                 batchSizeFuture.thenAccept(bi -> {
-                    PositionImpl seekPosition;
-                    if (bi > 0) {
-                        long[] ackSet;
-                        BitSetRecyclable bitSet = BitSetRecyclable.create();
-                        bitSet.set(0, bi);
-                        if (isExcluded) {
-                            bitSet.clear(0, Math.max(batchIndex + 1, 0));
-                            if (bitSet.length() > 0) {
-                                ackSet = bitSet.toLongArray();
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
-                            } else {
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                                seekPosition = seekPosition.getNext();
-                            }
-                        } else {
-                            if (batchIndex - 1 >= 0) {
-                                bitSet.clear(0, batchIndex);
-                                ackSet = bitSet.toLongArray();
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
-                            } else {
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                            }
-                        }
-                        bitSet.recycle();
-                    } else {
-                        seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                        seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
-                    }
-
+                    PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
                     sub.resetCursor(seekPosition).thenRun(() -> {
                         log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
                                 topicName, subName, messageId);
@@ -2159,6 +2091,89 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    private void getEntryBatchSize(CompletableFuture<Integer> batchSizeFuture, PersistentTopic topic,
+                                   MessageIdImpl messageId, int batchIndex) {
+        if (batchIndex >= 0) {
+            try {
+                ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
+                ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
+                        messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
+                    @Override
+                    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                        // Since we can't read the message from the storage layer,
+                        // it might be an already delete message ID or an invalid message ID
+                        // We should fall back to non batch index seek.
+                        batchSizeFuture.complete(0);
+                    }
+
+                    @Override
+                    public void readEntryComplete(Entry entry, Object ctx) {
+                        try {
+                            try {
+                                if (entry == null) {
+                                    batchSizeFuture.complete(0);
+                                } else {
+                                    MessageMetadata metadata =
+                                            Commands.parseMessageMetadata(entry.getDataBuffer());
+                                    batchSizeFuture.complete(metadata.getNumMessagesInBatch());
+                                }
+                            } catch (Exception e) {
+                                batchSizeFuture.completeExceptionally(new RestException(e));
+                            }
+                        } finally {
+                            if (entry != null) {
+                                entry.release();
+                            }
+                        }
+                    }
+                }, null);
+            } catch (NullPointerException npe) {
+                batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
+            } catch (Exception exception) {
+                log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
+                        clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception);
+                batchSizeFuture.completeExceptionally(new RestException(exception));
+            }
+        } else {
+            batchSizeFuture.complete(0);
+        }
+    }
+
+    private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize,
+                                                 int batchIndex, MessageIdImpl messageId) {
+        PositionImpl seekPosition;
+        if (batchSize > 0) {
+            long[] ackSet;
+            BitSetRecyclable bitSet = BitSetRecyclable.create();
+            bitSet.set(0, batchSize);
+            if (isExcluded) {
+                bitSet.clear(0, Math.max(batchIndex + 1, 0));
+                if (bitSet.length() > 0) {
+                    ackSet = bitSet.toLongArray();
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(),
+                            messageId.getEntryId(), ackSet);
+                } else {
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+                    seekPosition = seekPosition.getNext();
+                }
+            } else {
+                if (batchIndex - 1 >= 0) {
+                    bitSet.clear(0, batchIndex);
+                    ackSet = bitSet.toLongArray();
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(),
+                            messageId.getEntryId(), ackSet);
+                } else {
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+                }
+            }
+            bitSet.recycle();
+        } else {
+            seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+            seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
+        }
+        return seekPosition;
+    }
+
     protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId,
                                               boolean authoritative) {
         try {
@@ -2743,16 +2758,15 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-
-    protected void internalExpireMessages(AsyncResponse asyncResponse, String subName, int expireTimeInSeconds,
-            boolean authoritative) {
+    protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, String subName,
+                                                     int expireTimeInSeconds, boolean authoritative) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
             try {
-                internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
             } catch (WebApplicationException wae) {
                 asyncResponse.resume(wae);
                 return;
@@ -2799,7 +2813,7 @@ public class PersistentTopicsBase extends AdminResource {
                 });
             } else {
                 try {
-                    internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+                    internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
                 } catch (WebApplicationException wae) {
                     asyncResponse.resume(wae);
                     return;
@@ -2812,7 +2826,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    private void internalExpireMessagesForSinglePartition(String subName, int expireTimeInSeconds,
+    private void internalExpireMessagesByTimestampForSinglePartition(String subName, int expireTimeInSeconds,
             boolean authoritative) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
@@ -2856,6 +2870,89 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
+                                                 MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.warn("[{}][{}] Failed to expire messages on subscription {} to position {}: {}", clientAppId(),
+                        topicName, subName, messageId, e.getMessage());
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(), topicName,
+                subName, messageId);
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+            log.warn("[{}] Not supported operation expire message up to {} on partitioned-topic {} {}",
+                    clientAppId(), messageId, topicName, subName);
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Expire message at position is not supported for partitioned-topic"));
+            return;
+        } else if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) {
+            log.warn("[{}] Invalid parameter for expire message by position, partition index of passed in message"
+                            + " position {} doesn't match partition index of topic requested {}.",
+                    clientAppId(), messageId, topicName);
+            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                    "Invalid parameter for expire message by position, partition index of message position "
+                            + "passed in doesn't match partition index for the topic."));
+        } else {
+            validateAdminAccessForSubscriber(subName, authoritative);
+            validateReadOperationOnTopic(authoritative);
+            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+            if (topic == null) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                return;
+            }
+            try {
+                PersistentSubscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                    return;
+                }
+                CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
+                getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
+                batchSizeFuture.thenAccept(bi -> {
+                    PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
+                    try {
+                        if (subName.startsWith(topic.getReplicatorPrefix())) {
+                            String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
+                            PersistentReplicator repl = (PersistentReplicator)
+                                    topic.getPersistentReplicator(remoteCluster);
+                            checkNotNull(repl);
+                            repl.expireMessages(position);
+                        } else {
+                            checkNotNull(sub);
+                            sub.expireMessages(position);
+                        }
+                        log.info("[{}] Message expire issued up to {} on {} {}", clientAppId(), position, topicName,
+                                subName);
+                    } catch (NullPointerException npe) {
+                        throw new RestException(Status.NOT_FOUND, "Subscription not found");
+                    } catch (Exception exception) {
+                        log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}",
+                                clientAppId(), position, topicName, subName, exception);
+                        throw new RestException(exception);
+                    }
+                }).exceptionally(e -> {
+                    log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(),
+                            messageId, topicName, subName, e);
+                    asyncResponse.resume(e);
+                    return null;
+                });
+            } catch (Exception e) {
+                log.warn("[{}][{}] Failed to expire messages up to {} on subscription {} to position {}",
+                        clientAppId(), topicName, messageId, subName, messageId, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+            }
+        }
+        asyncResponse.resume(Response.noContent().build());
+    }
+
     protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
         log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName);
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index d33cac0..1ba1467 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -446,7 +446,46 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative);
+            internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
+                    expireTimeInSeconds, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages")
+    @ApiOperation(value = "Expiry messages on a topic subscription.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic or subscription does not exist"),
+            @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+    public void expireTopicMessages(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription to be Expiry messages on")
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
+                    ResetCursorData resetCursorData) {
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
+                    new MessageIdImpl(resetCursorData.getLedgerId(),
+                            resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
+                    , resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ae3d734..592cfa8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1148,7 +1148,47 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
-            internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative);
+            internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
+                    expireTimeInSeconds, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages")
+    @ApiOperation(value = "Expiry messages on a topic subscription.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic or subscription does not exist"),
+            @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+    public void expireTopicMessages(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription to be Expiry messages on")
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
+            ResetCursorData resetCursorData) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
+            new MessageIdImpl(resetCursorData.getLedgerId(),
+                    resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
+            , resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 1c05399..0cd5585 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -86,6 +86,8 @@ public interface Subscription {
 
     void expireMessages(int messageTTLInSeconds);
 
+    void expireMessages(Position position);
+
     void redeliverUnacknowledgedMessages(Consumer consumer);
 
     void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 635ce3e..bb1cb27 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -434,6 +434,12 @@ public class NonPersistentSubscription implements Subscription {
         // No-op
     }
 
+    @Override
+    public void expireMessages(Position position) {
+        throw new UnsupportedOperationException("Expire message by position is not supported for"
+                + " non-persistent topic.");
+    }
+
     public NonPersistentSubscriptionStats getStats() {
         NonPersistentSubscriptionStats subStats = new NonPersistentSubscriptionStats();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 502dd2f..df0b84c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedger
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.stats.Rate;
@@ -92,6 +93,32 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
         }
     }
 
+    public void expireMessages(Position messagePosition) {
+        // If it's beyond last position of this topic, do nothing.
+        if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) {
+            return;
+        }
+        if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
+            log.info("[{}][{}] Starting message expiry check, position= {} seconds", topicName, subName,
+                    messagePosition);
+
+            cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
+                try {
+                    // If given position larger than entry position.
+                    return ((PositionImpl) entry.getPosition()).compareTo((PositionImpl) messagePosition) <= 0;
+                } finally {
+                    entry.release();
+                }
+            }, this, null);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName,
+                        subName);
+            }
+        }
+    }
+
+
     public void updateRates() {
         msgExpired.calculateRate();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 35f8199..ec3397d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -712,6 +712,18 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         }
     }
 
+    public void expireMessages(Position position) {
+        if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
+                || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
+                && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) {
+            // don't do anything for almost caught-up connected subscriptions
+            return;
+        }
+        if (expiryMonitor != null) {
+            expiryMonitor.expireMessages(messageTTLInSeconds);
+        }
+    }
+
     @Override
     public Optional<DispatchRateLimiter> getRateLimiter() {
         return dispatchRateLimiter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 1aeced3..4a8186a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -894,6 +894,11 @@ public class PersistentSubscription implements Subscription {
         expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
+    @Override
+    public void expireMessages(Position position) {
+        expiryMonitor.expireMessages(position);
+    }
+
     public double getExpiredMessageRate() {
         return expiryMonitor.getMessageExpiryRate();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 3f03da3..c2f1b11 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -128,6 +128,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1574,16 +1575,17 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
     long messageTimestamp = System.currentTimeMillis();
     long secondTimestamp = System.currentTimeMillis();
 
-    private void publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
-        publishMessagesOnPersistentTopic(topicName, messages, 0, false);
+    private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
+        return publishMessagesOnPersistentTopic(topicName, messages, 0, false);
     }
 
-    private void publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
-        publishMessagesOnPersistentTopic(topicName, messages, 0, true);
+    private List<MessageId> publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
+        return publishMessagesOnPersistentTopic(topicName, messages, 0, true);
     }
 
-    private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
+    private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
                                                   boolean nullValue) throws Exception {
+        List<MessageId> messageIds = new ArrayList<>();
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
@@ -1592,14 +1594,15 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             if (nullValue) {
-                producer.send(null);
+                messageIds.add(producer.send(null));
             } else {
                 String message = "message-" + i;
-                producer.send(message.getBytes());
+                messageIds.add(producer.send(message.getBytes()));
             }
         }
 
         producer.close();
+        return messageIds;
     }
 
     @Test
@@ -2040,7 +2043,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
      */
     @Test
     public void testPersistentTopicsExpireMessages() throws Exception {
-
         // Force to create a topic
         publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 0);
         assertEquals(admin.topics().getList("prop-xyz/ns1"),
@@ -2060,21 +2062,21 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2").size(), 3);
 
-        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
+        List<MessageId> messageIds = publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
 
         TopicStats topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
         assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 10);
         assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
         assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);
 
-        Thread.sleep(1000); // wait for 1 seconds to expire message
+        Thread.sleep(1000);
         admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub1", 1);
-        Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
-
+        // Wait at most 2 seconds for sub1's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+                admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub1").lastMarkDeleteAdvancedTimestamp > 0L));
         topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
         SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
         assertEquals(subStats1.msgBacklog, 0);
-        assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
         SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2");
         assertEquals(subStats2.msgBacklog, 10);
         assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L);
@@ -2082,24 +2084,69 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(subStats3.msgBacklog, 10);
         assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
 
-        admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
-        Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
+        admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub2",
+                messageIds.get(4), false);
+        // Wait at most 2 seconds for sub2's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+                admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub2").lastMarkDeleteAdvancedTimestamp > 0L));
+        topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
+        subStats1 = topicStats.subscriptions.get("my-sub1");
+        assertEquals(subStats1.msgBacklog, 0);
+        assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
+        Long sub2lastMarkDeleteAdvancedTimestamp = subStats1.lastMarkDeleteAdvancedTimestamp;
+        subStats2 = topicStats.subscriptions.get("my-sub2");
+        assertEquals(subStats2.msgBacklog, 5);
+        subStats3 = topicStats.subscriptions.get("my-sub3");
+        assertEquals(subStats3.msgBacklog, 10);
+        assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
 
+        admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
+        // Wait at most 2 seconds for sub3's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+                admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp > 0L));
         topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
-        SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1");
-        assertEquals(newSubStats1.msgBacklog, 0);
-        assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
-        SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2");
-        assertEquals(newSubStats2.msgBacklog, 0);
-        assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp);
-        SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3");
-        assertEquals(newSubStats3.msgBacklog, 0);
-        assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp);
+        subStats1 = topicStats.subscriptions.get("my-sub1");
+        assertEquals(subStats1.msgBacklog, 0);
+        assertEquals(subStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
+        // Wait at most 2 seconds for rest of sub2's message to expire.
+        subStats2 = topicStats.subscriptions.get("my-sub2");
+        assertEquals(subStats2.msgBacklog, 0);
+        assertTrue(subStats2.lastMarkDeleteAdvancedTimestamp > sub2lastMarkDeleteAdvancedTimestamp);
+        subStats3 = topicStats.subscriptions.get("my-sub3");
+        assertEquals(subStats3.msgBacklog, 0);
 
         consumer1.close();
         consumer2.close();
         consumer3.close();
+    }
 
+    @Test
+    public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() throws Exception {
+        // Force to create a topic
+        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 0);
+        assertEquals(admin.topics().getList("prop-xyz/ns1"),
+                Lists.newArrayList("persistent://prop-xyz/ns1/ds2-partition-2"));
+
+        // create consumer and subscription
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
+                .topic("persistent://prop-xyz/ns1/ds2-partition-2")
+                .subscriptionType(SubscriptionType.Shared);
+        @Cleanup
+        Consumer<byte[]> consumer = consumerBuilder.clone().subscriptionName("my-sub").subscribe();
+
+        assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2-partition-2").size(), 1);
+        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 10);
+        try {
+            admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2-partition-2", "my-sub",
+                    new MessageIdImpl(1, 1, 1), false);
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Invalid parameter for expire message by position"));
+        }
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 0af9b1a..0fca6ea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceExcept
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 36ca8e8..a2b8a1e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -19,6 +19,13 @@
 package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
@@ -29,6 +36,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -49,12 +57,18 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+
 /**
  */
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
@@ -206,7 +220,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
      * @throws Exception
      */
     @Test
-    void testMessageExpiryWithNonRecoverableException() throws Exception {
+    void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
 
         final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers";
         final int entriesPerLedger = 2;
@@ -257,4 +271,74 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
         factory.shutdown();
 
     }
+
+    @Test
+    void testMessageExpiryWithPosition() throws Exception {
+        final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
+        final int entriesPerLedger = 5;
+        final int totalEntries = 30;
+        List<PositionImpl> positions = new ArrayList<>();
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(10);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+        config.setAutoSkipNonRecoverableData(true);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
+
+        PersistentSubscription subscription = mock(PersistentSubscription.class);
+        Topic topic = mock(Topic.class);
+        when(subscription.getTopic()).thenReturn(topic);
+
+        for (int i = 0; i < totalEntries; i++) {
+            positions.add((PositionImpl) ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
+        }
+        when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
+        for (Position p : positions) {
+            System.out.println(p);
+        }
+        PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname",
+                cursor.getName(), cursor, subscription));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1));
+
+        // Expire by position and verify mark delete position of cursor.
+        monitor.expireMessages(positions.get(15));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position beyond last position and nothing should happen.
+        monitor.expireMessages(PositionImpl.get(100, 100));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+
+        // Expire by position again and verify mark delete position of cursor didn't change.
+        monitor.expireMessages(positions.get(15));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position before current mark delete position and verify mark delete position of cursor didn't change.
+        monitor.expireMessages(positions.get(10));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position after current mark delete position and verify mark delete position of cursor move to new position.
+        monitor.expireMessages(positions.get(16));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(16).getLedgerId(), positions.get(16).getEntryId()));
+        clearInvocations(monitor);
+
+        cursor.close();
+        ledger.close();
+        factory.shutdown();
+    }
+
+    @Test
+    public void test() {
+        ResetCursorData resetCursorData = new ResetCursorData(1, 1);
+        resetCursorData.setExcluded(true);
+        System.out.println(Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index a7b63b2..f02f5e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -212,6 +213,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
         tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
         tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+        tenantAdmin.topics().expireMessages(topicName, subscriptionName, new MessageIdImpl(-1, -1, -1), true);
         tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index d5c35a9..54d831e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1183,6 +1183,40 @@ public interface Topics {
             long expireTimeInSeconds);
 
     /**
+     * Expire all messages older than given N (expireTimeInSeconds) seconds for a given subscription.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            Position before which all messages will be expired.
+     * @param isExcluded
+     *            Will message at passed in position also be expired.
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded)
+            throws PulsarAdminException;
+
+    /**
+     * Expire all messages older than given N (expireTimeInSeconds) seconds for a given subscription asynchronously.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            Position before which all messages will be expired.
+     * @param isExcluded
+     *            Will message at passed in position also be expired.
+     * @return
+     *            A {@link CompletableFuture} that'll be completed when expire message is done.
+     */
+    CompletableFuture<Void> expireMessagesAsync(String topic, String subscriptionName,
+                                                MessageId messageId, boolean isExcluded);
+
+    /**
      * Expire all messages older than given N seconds for all subscriptions of the persistent-topic.
      *
      * @param topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 314a50b..550b7d4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -938,6 +938,33 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded)
+            throws PulsarAdminException {
+        try {
+            expireMessagesAsync(topic, subscriptionName, messageId, isExcluded)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> expireMessagesAsync(String topic, String subscriptionName,
+                                                       MessageId messageId, boolean isExcluded) {
+        TopicName tn = validateTopic(topic);
+        String encodedSubName = Codec.encode(subscriptionName);
+        ResetCursorData resetCursorData = new ResetCursorData(messageId);
+        resetCursorData.setExcluded(isExcluded);
+        WebTarget path = topicPath(tn, "subscription", encodedSubName, "expireMessages");
+        return asyncPostRequest(path, Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException {
         try {
             expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7f0f33c..cf92d40 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -730,6 +730,11 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 100"));
         verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);
 
+        //cmd with option cannot be executed repeatedly.
+        cmdTopics = new CmdTopics(admin);
+        cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e"));
+        verify(mockTopics).expireMessages(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"), eq(new MessageIdImpl(1, 1, -1)), eq(true));
+
         cmdTopics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
         verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index ce48ef4..5113daa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -104,10 +104,14 @@ abstract class CliCommand {
     }
 
     static MessageId validateMessageIdString(String resetMessageIdStr) throws PulsarAdminException {
+        return validateMessageIdString(resetMessageIdStr, -1);
+    }
+
+    static MessageId validateMessageIdString(String resetMessageIdStr, int partitionIndex) throws PulsarAdminException {
         String[] messageId = resetMessageIdStr.split(":");
         try {
             Preconditions.checkArgument(messageId.length == 2);
-            return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), -1);
+            return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), partitionIndex);
         } catch (Exception e) {
             throw new PulsarAdminException(
                     "Invalid message id (must be in format: ledgerId:entryId) value " + resetMessageIdStr);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index fc2944d..a0c17a7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -40,6 +40,8 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -648,13 +651,35 @@ public class CmdTopics extends CmdBase {
                 "--subscription" }, description = "Subscription to be skip messages on", required = true)
         private String subName;
 
-        @Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds", required = true)
-        private long expireTimeInSeconds;
+        @Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds")
+        private long expireTimeInSeconds = -1;
+
+        @Parameter(names = { "--position",
+                "-p" }, description = "message position to reset back to (ledgerId:entryId)", required = false)
+        private String messagePosition;
+
+        @Parameter(names = { "-e", "--exclude-reset-position" },
+                description = "Exclude the reset position, start consume messages from the next position.", required = false)
+        private boolean excludeResetPosition = false;
 
         @Override
         void run() throws PulsarAdminException {
+            if (expireTimeInSeconds >= 0 && isNotBlank(messagePosition)) {
+                throw new ParameterException(String.format("Can't expire message by time and " +
+                        "by message position at the same time."));
+            }
             String topic = validateTopicName(params);
-            topics.expireMessages(topic, subName, expireTimeInSeconds);
+            if (expireTimeInSeconds >= 0) {
+                topics.expireMessages(topic, subName, expireTimeInSeconds);
+            } else if (isNotBlank(messagePosition)) {
+                int partitionIndex = TopicName.get(topic).getPartitionIndex();
+                MessageId messageId = validateMessageIdString(messagePosition, partitionIndex);
+                topics.expireMessages(topic, subName, messageId, excludeResetPosition);
+            } else {
+                throw new ParameterException(
+                        "Either time (--expireTime) or message position (--position) has to be provided" +
+                                " to expire messages");
+            }
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
index 9311fa0..1bc7b7f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
@@ -49,10 +49,12 @@ public class ResetCursorData {
             this.ledgerId = batchMessageId.getLedgerId();
             this.entryId = batchMessageId.getEntryId();
             this.batchIndex = batchMessageId.getBatchIndex();
+            this.partitionIndex = batchMessageId.partitionIndex;
         } else if (messageId instanceof MessageIdImpl) {
             MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
             this.ledgerId = messageIdImpl.getLedgerId();
             this.entryId = messageIdImpl.getEntryId();
+            this.partitionIndex = messageIdImpl.partitionIndex;
         }  else if (messageId instanceof TopicMessageIdImpl) {
             throw new IllegalArgumentException("Not supported operation on partitioned-topic");
         }