You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/02 17:00:01 UTC

[incubator-pulsar] branch master updated: Rest API for Ledger Offloading (#1639)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f678e0  Rest API for Ledger Offloading (#1639)
5f678e0 is described below

commit 5f678e099095b133138ea0e536e06e916d8dc09e
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed May 2 18:59:58 2018 +0200

    Rest API for Ledger Offloading (#1639)
    
    * Rest API for Ledger Offloading
    
    Implemented for both V1 and V2 topic name formats. API takes a message
    ID, up to which the broker will try to offload messages. It returns
    the message ID of the first message in the topic which has not been
    offloaded.
    
    This patch also adds basic support for setting the Offloader
    implementation in the broker (needed for testing). Subsequent patches
    will make this configurable through ServiceConfiguration.
    
    * Split compaction endpoint into two
    
    One for triggering and one for getting the current status.
    
    * Add conflict to rest api doc
    
    * Fixed build
---
 .../org/apache/pulsar/broker/PulsarService.java    |   6 +
 .../broker/admin/impl/PersistentTopicsBase.java    |  24 +++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  37 +++++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  35 ++++-
 .../pulsar/broker/service/BrokerService.java       |   2 +
 .../broker/service/persistent/PersistentTopic.java |  57 +++++++-
 .../pulsar/broker/admin/AdminApiOffloadTest.java   | 148 +++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  14 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |  10 +-
 .../client/admin/LongRunningProcessStatus.java     |  18 +--
 .../pulsar/client/admin/OffloadProcessStatus.java  |  55 ++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  19 ++-
 .../pulsar/client/admin/internal/TopicsImpl.java   |  30 ++++-
 .../pulsar/admin/cli/CmdPersistentTopics.java      |   7 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |   7 +-
 15 files changed, 426 insertions(+), 43 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c74ecd7..8d39fda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -46,7 +46,9 @@ import java.util.function.Supplier;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -635,6 +637,10 @@ public class PulsarService implements AutoCloseable {
         return managedLedgerClientFactory.getManagedLedgerFactory();
     }
 
+    public LedgerOffloader getManagedLedgerOffloader() {
+        return NullLedgerOffloader.INSTANCE;
+    }
+
     public ZooKeeperCache getLocalZkCache() {
         return localZkCache;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ff50ec1..a354008 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -75,6 +76,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -86,7 +89,6 @@ import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -1110,12 +1112,30 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected CompactionStatus internalCompactionStatus(boolean authoritative) {
+    protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
         validateAdminOperationOnTopic(authoritative);
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         return topic.compactionStatus();
     }
 
+    protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) {
+        validateAdminOperationOnTopic(authoritative);
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        try {
+            topic.triggerOffload(messageId);
+        } catch (AlreadyRunningException e) {
+            throw new RestException(Status.CONFLICT, e.getMessage());
+        } catch (Exception e) {
+            throw new RestException(e);
+        }
+    }
+
+    protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
+        validateAdminOperationOnTopic(authoritative);
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        return topic.offloadStatus();
+    }
+
     public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
             String clientAppId, AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 33f5bf9..1ebb79a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -38,9 +38,10 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -445,11 +446,43 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
                             @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
                             @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") })
-    public CompactionStatus compactionStatus(
+    public LongRunningProcessStatus compactionStatus(
             @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
         return internalCompactionStatus(authoritative);
     }
+
+    @PUT
+    @Path("/{tenant}/{cluster}/{namespace}/{topic}/offload")
+    @ApiOperation(value = "Offload a prefix of a topic to long term storage")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not exist"),
+                            @ApiResponse(code = 409, message = "Offload already running")})
+    public void triggerOffload(@PathParam("tenant") String tenant,
+                               @PathParam("cluster") String cluster,
+                               @PathParam("namespace") String namespace,
+                               @PathParam("topic") @Encoded String encodedTopic,
+                               @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+                               MessageIdImpl messageId) {
+        validateTopicName(tenant, cluster, namespace, encodedTopic);
+        internalTriggerOffload(authoritative, messageId);
+    }
+
+    @GET
+    @Path("/{tenant}/{cluster}/{namespace}/{topic}/offload")
+    @ApiOperation(value = "Offload a prefix of a topic to long term storage")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not exist")})
+    public OffloadProcessStatus offloadStatus(@PathParam("tenant") String tenant,
+                                              @PathParam("cluster") String cluster,
+                                              @PathParam("namespace") String namespace,
+                                              @PathParam("topic") @Encoded String encodedTopic,
+                                              @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, cluster, namespace, encodedTopic);
+        return internalOffloadStatus(authoritative);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 2256d88..6a246b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -38,9 +38,10 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -430,11 +431,41 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
                             @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
                             @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") })
-    public CompactionStatus compactionStatus(
+    public LongRunningProcessStatus compactionStatus(
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalCompactionStatus(authoritative);
     }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/{topic}/offload")
+    @ApiOperation(value = "Offload a prefix of a topic to long term storage")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not exist"),
+                            @ApiResponse(code = 409, message = "Offload already running")})
+    public void triggerOffload(@PathParam("tenant") String tenant,
+                               @PathParam("namespace") String namespace,
+                               @PathParam("topic") @Encoded String encodedTopic,
+                               @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+                               MessageIdImpl messageId) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalTriggerOffload(authoritative, messageId);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/offload")
+    @ApiOperation(value = "Offload a prefix of a topic to long term storage")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not exist")})
+    public OffloadProcessStatus offloadStatus(@PathParam("tenant") String tenant,
+                                              @PathParam("namespace") String namespace,
+                                              @PathParam("topic") @Encoded String encodedTopic,
+                                              @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        return internalOffloadStatus(authoritative);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 43f5000..115c30c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -722,6 +722,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
             managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
 
+            managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader());
+
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 574fab8..3cfddb0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -81,13 +82,14 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -173,6 +175,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
     final CompactedTopic compactedTopic;
 
+    CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
+            (MessageIdImpl)MessageId.earliest);
+
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
 
@@ -1683,23 +1688,61 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         }
     }
 
-
-    public synchronized CompactionStatus compactionStatus() {
+    public synchronized LongRunningProcessStatus compactionStatus() {
         final CompletableFuture<Long> current;
         synchronized (this) {
             current = currentCompaction;
         }
         if (!current.isDone()) {
-            return CompactionStatus.forStatus(CompactionStatus.Status.RUNNING);
+            return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
         } else {
             try {
                 if (current.join() == COMPACTION_NEVER_RUN) {
-                    return CompactionStatus.forStatus(CompactionStatus.Status.NOT_RUN);
+                    return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+                } else {
+                    return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+                }
+            } catch (CancellationException | CompletionException e) {
+                return LongRunningProcessStatus.forError(e.getMessage());
+            }
+        }
+    }
+
+    public synchronized void triggerOffload(MessageIdImpl messageId) throws AlreadyRunningException {
+        if (currentOffload.isDone()) {
+            CompletableFuture<MessageIdImpl> promise = currentOffload = new CompletableFuture<>();
+            getManagedLedger().asyncOffloadPrefix(
+                    PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()),
+                    new OffloadCallback() {
+                        @Override
+                        public void offloadComplete(Position pos, Object ctx) {
+                            PositionImpl impl = (PositionImpl)pos;
+
+                            promise.complete(new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1));
+                        }
+
+                        @Override
+                        public void offloadFailed(ManagedLedgerException exception, Object ctx) {
+                            promise.completeExceptionally(exception);
+                        }
+                    }, null);
+        } else {
+            throw new AlreadyRunningException("Offload already in progress");
+        }
+    }
+
+    public synchronized OffloadProcessStatus offloadStatus() {
+        if (!currentOffload.isDone()) {
+            return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+        } else {
+            try {
+                if (currentOffload.join() == MessageId.earliest) {
+                    return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
                 } else {
-                    return CompactionStatus.forStatus(CompactionStatus.Status.SUCCESS);
+                    return OffloadProcessStatus.forSuccess(currentOffload.join());
                 }
             } catch (CancellationException | CompletionException e) {
-                return CompactionStatus.forError(e.getMessage());
+                return OffloadProcessStatus.forError(e.getMessage());
             }
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
new file mode 100644
index 0000000..dc925a2
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.Sets;
+
+import java.util.concurrent.CompletableFuture;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AdminApiOffloadTest.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setManagedLedgerMaxEntriesPerLedger(10);
+        conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+
+        super.internalSetup();
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("prop-xyz", tenantInfo);
+        admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private void testOffload(String topicName, String mlName) throws Exception {
+        LedgerOffloader offloader = mock(LedgerOffloader.class);
+        doReturn(offloader).when(pulsar).getManagedLedgerOffloader();
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        doReturn(promise).when(offloader).offload(anyObject(), anyObject(), anyObject());
+
+        MessageId currentId = MessageId.latest;
+        try (Producer p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
+            for (int i = 0; i < 15; i++) {
+                currentId = p.send("Foobar".getBytes());
+            }
+        }
+
+        ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName);
+        Assert.assertEquals(info.ledgers.size(), 2);
+
+        Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
+                            LongRunningProcessStatus.Status.NOT_RUN);
+
+        admin.persistentTopics().triggerOffload(topicName, currentId);
+
+        Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
+                            LongRunningProcessStatus.Status.RUNNING);
+
+        try {
+            admin.persistentTopics().triggerOffload(topicName, currentId);
+            Assert.fail("Should have failed");
+        } catch (ConflictException e) {
+            // expected
+        }
+
+        // fail first time
+        promise.completeExceptionally(new Exception("Some random failure"));
+
+        Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
+                            LongRunningProcessStatus.Status.ERROR);
+        Assert.assertTrue(admin.persistentTopics().offloadStatus(topicName).lastError.contains("Some random failure"));
+
+        // Try again
+        doReturn(CompletableFuture.completedFuture(null))
+            .when(offloader).offload(anyObject(), anyObject(), anyObject());
+
+        admin.persistentTopics().triggerOffload(topicName, currentId);
+
+        Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
+                            LongRunningProcessStatus.Status.SUCCESS);
+        MessageIdImpl firstUnoffloaded = admin.persistentTopics().offloadStatus(topicName).firstUnoffloadedMessage;
+        // First unoffloaded is the first entry of current ledger
+        Assert.assertEquals(firstUnoffloaded.getLedgerId(), info.ledgers.get(1).ledgerId);
+        Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
+
+        verify(offloader, times(2)).offload(anyObject(), anyObject(), anyObject());
+    }
+
+
+    @Test
+    public void testOffloadV2() throws Exception {
+        String topicName = "persistent://prop-xyz/ns1/topic1";
+        String mlName = "prop-xyz/ns1/persistent/topic1";
+        testOffload(topicName, mlName);
+    }
+
+    @Test
+    public void testOffloadV1() throws Exception {
+        String topicName = "persistent://prop-xyz/test/ns1/topic2";
+        String mlName = "prop-xyz/test/ns1/persistent/topic2";
+        testOffload(topicName, mlName);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index cd00d1c..c9d973d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -79,7 +80,6 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -1959,7 +1959,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         pulsarClient.newProducer().topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
-        assertEquals(admin.topics().compactionStatus(topicName).status, CompactionStatus.Status.NOT_RUN);
+        assertEquals(admin.topics().compactionStatus(topicName).status,
+                     LongRunningProcessStatus.Status.NOT_RUN);
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<Long>();
@@ -1967,18 +1968,21 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
-        assertEquals(admin.topics().compactionStatus(topicName).status, CompactionStatus.Status.RUNNING);
+        assertEquals(admin.topics().compactionStatus(topicName).status,
+                     LongRunningProcessStatus.Status.RUNNING);
 
         promise.complete(1L);
 
-        assertEquals(admin.topics().compactionStatus(topicName).status, CompactionStatus.Status.SUCCESS);
+        assertEquals(admin.topics().compactionStatus(topicName).status,
+                     LongRunningProcessStatus.Status.SUCCESS);
 
         CompletableFuture<Long> errorPromise = new CompletableFuture<Long>();
         doReturn(errorPromise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
         errorPromise.completeExceptionally(new Exception("Failed at something"));
 
-        assertEquals(admin.topics().compactionStatus(topicName).status, CompactionStatus.Status.ERROR);
+        assertEquals(admin.topics().compactionStatus(topicName).status,
+                     LongRunningProcessStatus.Status.ERROR);
         assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed at something"));
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 7a514cd..d2fb996 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -70,7 +71,6 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -1977,7 +1977,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
-                     CompactionStatus.Status.NOT_RUN);
+                     LongRunningProcessStatus.Status.NOT_RUN);
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<Long>();
@@ -1986,12 +1986,12 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         admin.topics().triggerCompaction(topicName);
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
-                     CompactionStatus.Status.RUNNING);
+                     LongRunningProcessStatus.Status.RUNNING);
 
         promise.complete(1L);
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
-                     CompactionStatus.Status.SUCCESS);
+                     LongRunningProcessStatus.Status.SUCCESS);
 
         CompletableFuture<Long> errorPromise = new CompletableFuture<Long>();
         doReturn(errorPromise).when(compactor).compact(topicName);
@@ -1999,7 +1999,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         errorPromise.completeExceptionally(new Exception("Failed at something"));
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
-                     CompactionStatus.Status.ERROR);
+                     LongRunningProcessStatus.Status.ERROR);
         assertTrue(admin.topics().compactionStatus(topicName)
                    .lastError.contains("Failed at something"));
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java
similarity index 69%
rename from pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java
rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java
index 9020c21..4a10289 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.compaction;
+package org.apache.pulsar.client.admin;
 
 /**
- * Status of compaction for a topic.
+ * Status of long running process.
  */
-public class CompactionStatus {
+public class LongRunningProcessStatus {
     public enum Status {
         NOT_RUN,
         RUNNING,
@@ -32,21 +32,21 @@ public class CompactionStatus {
     public Status status;
     public String lastError;
 
-    public CompactionStatus() {
+    public LongRunningProcessStatus() {
         this.status = Status.NOT_RUN;
         this.lastError = "";
     }
 
-    private CompactionStatus(Status status, String lastError) {
+    LongRunningProcessStatus(Status status, String lastError) {
         this.status = status;
         this.lastError = lastError;
     }
 
-    public static CompactionStatus forStatus(Status status) {
-        return new CompactionStatus(status, "");
+    public static LongRunningProcessStatus forStatus(Status status) {
+        return new LongRunningProcessStatus(status, "");
     }
 
-    public static CompactionStatus forError(String lastError) {
-        return new CompactionStatus(Status.ERROR, lastError);
+    public static LongRunningProcessStatus forError(String lastError) {
+        return new LongRunningProcessStatus(Status.ERROR, lastError);
     }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java
new file mode 100644
index 0000000..0644ee1
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+/**
+ * Status of offload process.
+ */
+public class OffloadProcessStatus extends LongRunningProcessStatus {
+
+    public MessageIdImpl firstUnoffloadedMessage;
+
+    public OffloadProcessStatus() {
+        super(Status.NOT_RUN, "");
+        firstUnoffloadedMessage = (MessageIdImpl)MessageId.earliest;
+    }
+
+    private OffloadProcessStatus(Status status, String lastError,
+                                 MessageIdImpl firstUnoffloadedMessage) {
+        this.status = status;
+        this.lastError = lastError;
+        this.firstUnoffloadedMessage = firstUnoffloadedMessage;
+    }
+
+    public static OffloadProcessStatus forStatus(Status status) {
+        return new OffloadProcessStatus(status, "", (MessageIdImpl)MessageId.earliest);
+    }
+
+    public static OffloadProcessStatus forError(String lastError) {
+        return new OffloadProcessStatus(Status.ERROR, lastError,
+                                        (MessageIdImpl)MessageId.earliest);
+    }
+
+    public static OffloadProcessStatus forSuccess(MessageIdImpl messageId) {
+        return new OffloadProcessStatus(Status.SUCCESS, "", messageId);
+    }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 2d629c3..fd0c082 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -30,7 +30,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -981,5 +980,21 @@ public interface Topics {
      *
      * @param topic The topic whose compaction status we wish to check
      */
-    CompactionStatus compactionStatus(String topic) throws PulsarAdminException;
+    LongRunningProcessStatus compactionStatus(String topic) throws PulsarAdminException;
+
+    /**
+     * Trigger offloading messages in topic to longterm storage.
+     *
+     * @param topic the topic to offload
+     * @param messageId ID of maximum message which should be offloaded
+     */
+    void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException;
+
+    /**
+     * Check the status of an ongoing offloading operation for a topic.
+     *
+     * @param topic the topic being offloaded
+     * @return the status of the offload operation
+     */
+    OffloadProcessStatus offloadStatus(String topic) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 7a6b21c..25ad23d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -52,6 +52,8 @@ import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PersistentTopics;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -66,7 +68,6 @@ import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -794,12 +795,35 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics
     }
 
     @Override
-    public CompactionStatus compactionStatus(String topic)
+    public LongRunningProcessStatus compactionStatus(String topic)
             throws PulsarAdminException {
         try {
             TopicName tn = validateTopic(topic);
             return request(topicPath(tn, "compaction"))
-                .get(CompactionStatus.class);
+                .get(LongRunningProcessStatus.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            WebTarget path = topicPath(tn, "offload");
+            request(path).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), MessageIdImpl.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public OffloadProcessStatus offloadStatus(String topic)
+            throws PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            return request(topicPath(tn, "offload"))
+                .get(OffloadProcessStatus.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 266ebc4..dd9cf9c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PersistentTopics;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -30,7 +31,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.compaction.CompactionStatus;
+
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
@@ -578,8 +579,8 @@ public class CmdPersistentTopics extends CmdBase {
             String persistentTopic = validatePersistentTopic(params);
 
             try {
-                CompactionStatus status = persistentTopics.compactionStatus(persistentTopic);
-                while (wait && status.status == CompactionStatus.Status.RUNNING) {
+                LongRunningProcessStatus status = persistentTopics.compactionStatus(persistentTopic);
+                while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) {
                     Thread.sleep(1000);
                     status = persistentTopics.compactionStatus(persistentTopic);
                 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 2d486aa..1981c71 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
@@ -43,7 +44,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.compaction.CompactionStatus;
+
 
 @Parameters(commandDescription = "Operations on persistent topics")
 public class CmdTopics extends CmdBase {
@@ -586,8 +587,8 @@ public class CmdTopics extends CmdBase {
             String persistentTopic = validatePersistentTopic(params);
 
             try {
-                CompactionStatus status = topics.compactionStatus(persistentTopic);
-                while (wait && status.status == CompactionStatus.Status.RUNNING) {
+                LongRunningProcessStatus status = topics.compactionStatus(persistentTopic);
+                while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) {
                     Thread.sleep(1000);
                     status = topics.compactionStatus(persistentTopic);
                 }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.