You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/10/03 06:42:21 UTC

[pulsar] branch master updated: [feat][broker]PIP-180 Shadow Topic - Part V - Support shadow topic creation. (#17711)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 11482048d35 [feat][broker]PIP-180 Shadow Topic - Part V - Support shadow topic creation. (#17711)
11482048d35 is described below

commit 11482048d357ccb4e4f1802304a7dd0bfd7b9c26
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Oct 3 14:42:09 2022 +0800

    [feat][broker]PIP-180 Shadow Topic - Part V - Support shadow topic creation. (#17711)
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  7 ++
 .../bookkeeper/mledger/ManagedLedgerFactory.java   |  5 ++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 17 ++--
 .../apache/bookkeeper/mledger/impl/MetaStore.java  |  9 ++
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     | 29 +++++++
 .../mledger/impl/ShadowManagedLedgerImpl.java      | 58 +++++++++++++
 .../pulsar/broker/service/BrokerService.java       | 32 ++++++-
 .../broker/service/persistent/PersistentTopic.java | 12 +++
 .../pulsar/broker/service/PersistentTopicTest.java |  2 +
 .../broker/service/persistent/ShadowTopicTest.java | 97 ++++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 54 ++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 40 ++++++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 19 ++++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 34 ++++++++
 14 files changed, 405 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 92c9c911981..0efd1ca2a82 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
 
 /**
@@ -742,4 +743,10 @@ public class ManagedLedgerConfig {
     public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsForCaching) {
         this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
     }
+
+    public String getShadowSource() {
+        return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
+    }
+
+    public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE";
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index e42c2581ba1..667d641ac9a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
@@ -197,4 +198,8 @@ public interface ManagedLedgerFactory {
      * */
     long getCacheEvictionTimeThreshold();
 
+    /**
+     * @return properties of this managedLedger.
+     */
+    CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index d7596a7468a..e0af1cc6326 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -367,11 +367,13 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         ledgers.computeIfAbsent(name, (mlName) -> {
             // Create the managed ledger
             CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>();
-            final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this,
-                    bookkeeperFactory.get(
-                            new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-                                    config.getBookKeeperEnsemblePlacementPolicyProperties())),
-                    store, config, scheduledExecutor, name, mlOwnershipChecker);
+            BookKeeper bk = bookkeeperFactory.get(
+                    new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                            config.getBookKeeperEnsemblePlacementPolicyProperties()));
+            final ManagedLedgerImpl newledger = config.getShadowSource() == null
+                    ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker)
+                    : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
+                    mlOwnershipChecker);
             PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
             pendingInitializeLedgers.put(name, pendingLedger);
             newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@@ -954,6 +956,11 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         return future;
     }
 
+    @Override
+    public CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name) {
+        return store.getManagedLedgerPropertiesAsync(name);
+    }
+
     public MetaStore getMetaStore() {
         return store;
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
index 35f109b21dc..3d60066782a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
@@ -158,4 +158,13 @@ public interface MetaStore {
      *         if the operation succeeds.
      */
     CompletableFuture<Boolean> asyncExists(String ledgerName);
+
+
+    /**
+     * Get managed ledger properties from meta store.
+     *
+     * @param name ledgerName
+     * @return a future represents the result of the operation.
+     */
+    CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index a501b9e43dc..2949902ac35 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -23,6 +23,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -148,6 +150,33 @@ public class MetaStoreImpl implements MetaStore {
                 });
     }
 
+    public CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name) {
+        CompletableFuture<Map<String, String>> result = new CompletableFuture<>();
+        getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
+                HashMap<String, String> propertiesMap = new HashMap<>(mlInfo.getPropertiesCount());
+                if (mlInfo.getPropertiesCount() > 0) {
+                    for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
+                        MLDataFormats.KeyValue property = mlInfo.getProperties(i);
+                        propertiesMap.put(property.getKey(), property.getValue());
+                    }
+                }
+                result.complete(propertiesMap);
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                if (e instanceof MetadataNotFoundException) {
+                    result.complete(Collections.emptyMap());
+                } else {
+                    result.completeExceptionally(e);
+                }
+            }
+        });
+        return result;
+    }
+
     @Override
     public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
             MetaStoreCallback<Void> callback) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
new file mode 100644
index 00000000000..346780a2283
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.mledger.impl;
+
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Working in progress until <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a> is finished.
+ * Currently, it works nothing different with ManagedLedgerImpl.
+ */
+@Slf4j
+public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
+
+    private final TopicName shadowSource;
+    private final String sourceMLName;
+
+    public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper,
+                                   MetaStore store, ManagedLedgerConfig config,
+                                   OrderedScheduler scheduledExecutor,
+                                   String name, final Supplier<Boolean> mlOwnershipChecker) {
+        super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
+        this.shadowSource = TopicName.get(config.getShadowSource());
+        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+    }
+
+    @Override
+    synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) {
+        // TODO: ShadowManagedLedger has different initialize process from normal ManagedLedger,
+        //  which is complicated and will be implemented in the next PRs.
+        super.initialize(callback, ctx);
+    }
+
+    public TopicName getShadowSource() {
+        return shadowSource;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8491615448a..2f6cb020b70 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1405,6 +1405,22 @@ public class BrokerService implements Closeable {
         return topicFuture;
     }
 
+    CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(TopicName topicName) {
+        if (!topicName.isPartitioned()) {
+            return managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding());
+        } else {
+            TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
+            return fetchPartitionedTopicMetadataAsync(partitionedTopicName)
+                    .thenCompose(metadata -> {
+                        if (metadata.partitions == PartitionedTopicMetadata.NON_PARTITIONED) {
+                            return managedLedgerFactory.getManagedLedgerPropertiesAsync(
+                                    topicName.getPersistenceNamingEncoding());
+                        }
+                        return CompletableFuture.completedFuture(metadata.properties);
+                    });
+        }
+    }
+
     private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> topicFuture,
                                        Map<String, String> properties) {
@@ -1412,7 +1428,21 @@ public class BrokerService implements Closeable {
         pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
                 .thenAccept(isActive -> {
                     if (isActive) {
-                        createPersistentTopic(topic, createIfMissing, topicFuture, properties);
+                        CompletableFuture<Map<String, String>> propertiesFuture;
+                        if (properties == null) {
+                            //Read properties from storage when loading topic.
+                            propertiesFuture = fetchTopicPropertiesAsync(topicName);
+                        } else {
+                            propertiesFuture = CompletableFuture.completedFuture(properties);
+                        }
+                        propertiesFuture.thenAccept(finalProperties ->
+                                createPersistentTopic(topic, createIfMissing, topicFuture, finalProperties)
+                        ).exceptionally(throwable -> {
+                            log.warn("[{}] Read topic property failed", topic, throwable);
+                            pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                            topicFuture.completeExceptionally(throwable);
+                            return null;
+                        });
                     } else {
                         // namespace is being unloaded
                         String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index fb1c521bb00..a3193935e53 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -176,6 +177,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator> shadowReplicators;
     @Getter
     private volatile List<String> shadowTopics;
+    private final TopicName shadowSourceTopic;
 
     static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
     private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
@@ -302,6 +304,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             this.transactionBuffer = new TransactionBufferDisable();
         }
         transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
+        if (ledger instanceof ShadowManagedLedgerImpl) {
+            shadowSourceTopic = ((ShadowManagedLedgerImpl) ledger).getShadowSource();
+        } else {
+            shadowSourceTopic = null;
+        }
     }
 
     @Override
@@ -381,6 +388,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         } else {
             this.transactionBuffer = new TransactionBufferDisable();
         }
+        shadowSourceTopic = null;
     }
 
     private void initializeDispatchRateLimiterIfNeeded() {
@@ -3330,4 +3338,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public long getLastDataMessagePublishedTimestamp() {
         return lastDataMessagePublishedTimestamp;
     }
+
+    public Optional<TopicName> getShadowSourceTopic() {
+        return Optional.ofNullable(shadowSourceTopic);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 970bfd763a4..2771808a9fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -201,6 +201,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
         doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();
 
+        doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null))
+                .when(mlFactoryMock).getManagedLedgerPropertiesAsync(any());
         doAnswer(invocation -> {
             DeleteLedgerCallback deleteLedgerCallback = invocation.getArgument(1);
             deleteLedgerCallback.deleteLedgerComplete(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
new file mode 100644
index 00000000000..22bfd70cf88
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ShadowTopicTest extends BrokerTestBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test()
+    public void testNonPartitionedShadowTopicSetup() throws Exception {
+        String sourceTopic = "persistent://prop/ns-abc/source";
+        String shadowTopic = "persistent://prop/ns-abc/shadow";
+        //1. test shadow topic setting in topic creation.
+        admin.topics().createNonPartitionedTopic(sourceTopic);
+        admin.topics().createShadowTopic(shadowTopic, sourceTopic);
+        PersistentTopic brokerShadowTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get();
+        Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
+        Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic);
+        Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic);
+
+        //2. test shadow topic could be properly loaded after unload.
+        admin.namespaces().unload("prop/ns-abc");
+        Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty());
+        Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic);
+        brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get();
+        Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
+        Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic);
+    }
+
+    @Test()
+    public void testPartitionedShadowTopicSetup() throws Exception {
+        String sourceTopic = "persistent://prop/ns-abc/source-p";
+        String shadowTopic = "persistent://prop/ns-abc/shadow-p";
+        String shadowTopicPartition = TopicName.get(shadowTopic).getPartition(0).toString();
+
+        //1. test shadow topic setting in topic creation.
+        admin.topics().createPartitionedTopic(sourceTopic, 2);
+        admin.topics().createShadowTopic(shadowTopic, sourceTopic);
+        pulsarClient.newProducer().topic(shadowTopic).create().close();//trigger loading partitions.
+        PersistentTopic brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopicIfExists(shadowTopicPartition).get().get();
+        Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
+        Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic);
+        Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic);
+
+        //2. test shadow topic could be properly loaded after unload.
+        admin.namespaces().unload("prop/ns-abc");
+        Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty());
+
+        Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic);
+        brokerShadowTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopicPartition).get().get();
+        Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
+        Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic);
+    }
+
+
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 177cae9a9a4..d2d0754f1d1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -4397,4 +4397,58 @@ public interface Topics {
      * @param sourceTopic source topic name
      */
     CompletableFuture<List<String>> getShadowTopicsAsync(String sourceTopic);
+
+    /**
+     * Get the shadow source topic name of the given shadow topic.
+     * @param shadowTopic shadow topic name.
+     * @return The topic name of the source of the shadow topic.
+     */
+    String getShadowSource(String shadowTopic) throws PulsarAdminException;
+
+    /**
+     * Get the shadow source topic name of the given shadow topic asynchronously.
+     * @param shadowTopic shadow topic name.
+     * @return The topic name of the source of the shadow topic.
+     */
+    CompletableFuture<String> getShadowSourceAsync(String shadowTopic);
+
+    /**
+     * Create a new shadow topic as the shadow of the source topic.
+     * The source topic must exist before call this method.
+     * <p>
+     *     For partitioned source topic, the partition number of shadow topic follows the source topic at creation. If
+     *     the partition number of the source topic changes, the shadow topic needs to update its partition number
+     *     manually.
+     *     For non-partitioned source topic, the shadow topic will be created as non-partitioned topic.
+     * </p>
+     *
+     * NOTE: This is still WIP until <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a> is finished.
+     *
+     * @param shadowTopic shadow topic name, and it must be a persistent topic name.
+     * @param sourceTopic source topic name, and it must be a persistent topic name.
+     * @param properties properties to be created with in the shadow topic.
+     * @throws PulsarAdminException
+     */
+    void createShadowTopic(String shadowTopic, String sourceTopic, Map<String, String> properties)
+            throws PulsarAdminException;
+
+    /**
+     * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details.
+     */
+    CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic,
+                                                   Map<String, String> properties);
+
+    /**
+     * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details.
+     */
+    default void createShadowTopic(String shadowTopic, String sourceTopic) throws PulsarAdminException {
+        createShadowTopic(shadowTopic, sourceTopic, null);
+    }
+
+    /**
+     * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details.
+     */
+    default CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic) {
+        return createShadowTopicAsync(shadowTopic, sourceTopic, null);
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index e3b51accdfd..4312080ac22 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -131,6 +131,8 @@ public class TopicsImpl extends BaseResource implements Topics {
     private static final String ENCRYPTION_KEYS = "X-Pulsar-Base64-encryption-keys";
     // CHECKSTYLE.ON: MemberName
 
+    public static final String PROPERTY_SHADOW_SOURCE_KEY = "PULSAR.SHADOW_SOURCE";
+
     public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
         super(auth, readTimeoutMs);
         adminTopics = web.path("/admin");
@@ -2705,7 +2707,43 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<List<String>> getShadowTopicsAsync(String sourceTopic) {
         TopicName tn = validateTopic(sourceTopic);
         WebTarget path = topicPath(tn, "shadowTopics");
-        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
+        return asyncGetRequest(path, new FutureCallback<List<String>>() {});
+    }
+
+    @Override
+    public String getShadowSource(String shadowTopic) throws PulsarAdminException {
+        return sync(() -> getShadowSourceAsync(shadowTopic));
+    }
+
+    @Override
+    public CompletableFuture<String> getShadowSourceAsync(String shadowTopic) {
+        return getPropertiesAsync(shadowTopic).thenApply(
+                properties -> properties != null ? properties.get(PROPERTY_SHADOW_SOURCE_KEY) : null);
+    }
+
+    @Override
+    public void createShadowTopic(String shadowTopic, String sourceTopic, Map<String, String> properties)
+            throws PulsarAdminException {
+        sync(() -> createShadowTopicAsync(shadowTopic, sourceTopic, properties));
+    }
+
+    @Override
+    public CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic,
+                                                          Map<String, String> properties) {
+        checkArgument(TopicName.get(shadowTopic).isPersistent(), "Shadow topic must be persistent");
+        checkArgument(TopicName.get(sourceTopic).isPersistent(), "Source topic must be persistent");
+        return getPartitionedTopicMetadataAsync(sourceTopic).thenCompose(sourceTopicMeta -> {
+            HashMap<String, String> shadowProperties = new HashMap<>();
+            if (properties != null) {
+                shadowProperties.putAll(properties);
+            }
+            shadowProperties.put(PROPERTY_SHADOW_SOURCE_KEY, sourceTopic);
+            if (sourceTopicMeta.partitions == PartitionedTopicMetadata.NON_PARTITIONED) {
+                return createNonPartitionedTopicAsync(shadowTopic, shadowProperties);
+            } else {
+                return createPartitionedTopicAsync(shadowTopic, sourceTopicMeta.partitions, shadowProperties);
+            }
+        });
     }
 
     private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index b9cfbe8c97e..bcb1fca1705 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -34,7 +34,6 @@ import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.PrintStream;
@@ -51,7 +50,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
 import org.apache.pulsar.client.admin.Bookies;
@@ -74,7 +72,6 @@ import org.apache.pulsar.client.admin.TopicPolicies;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.admin.Transactions;
 import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl;
-import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -1946,6 +1943,22 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-shadow-topics persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).removeShadowTopics("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("create-shadow-topic -s persistent://myprop/clust/ns1/source persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).createShadowTopic("persistent://myprop/clust/ns1/ds1", "persistent://myprop/clust/ns1/source", null);
+
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("create-shadow-topic -p a=aa,b=bb,c=cc -s persistent://myprop/clust/ns1/source persistent://myprop/clust/ns1/ds1"));
+        HashMap<String, String> p = new HashMap<>();
+        p.put("a","aa");
+        p.put("b","bb");
+        p.put("c","cc");
+        verify(mockTopics).createShadowTopic("persistent://myprop/clust/ns1/ds1", "persistent://myprop/clust/ns1/source", p);
+
+        cmdTopics.run(split("get-shadow-source persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getShadowSource("persistent://myprop/clust/ns1/ds1");
+
+
+
     }
 
     private static LedgerInfo newLedger(long id, long entries, long size) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index ae37a591bae..245cfc1b855 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -255,6 +255,8 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-shadow-topics", new GetShadowTopics());
         jcommander.addCommand("set-shadow-topics", new SetShadowTopics());
         jcommander.addCommand("remove-shadow-topics", new RemoveShadowTopics());
+        jcommander.addCommand("create-shadow-topic", new CreateShadowTopic());
+        jcommander.addCommand("get-shadow-source", new GetShadowSource());
 
         jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced());
         jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());
@@ -1714,6 +1716,38 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Create a shadow topic for an existing source topic.")
+    private class CreateShadowTopic extends CliCommand {
+
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--source", "-s"}, description = "source topic name", required = true)
+        private String sourceTopic;
+
+        @Parameter(names = {"--properties", "-p"}, description = "key value pair properties(eg: a=a b=b c=c)")
+        private java.util.List<String> propertyList;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            Map<String, String> properties = parseListKeyValueMap(propertyList);
+            getTopics().createShadowTopic(topic, TopicName.get(sourceTopic).toString(), properties);
+        }
+    }
+
+    @Parameters(commandDescription = "Get the source topic for a shadow topic")
+    private class GetShadowSource extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String shadowTopic = validatePersistentTopic(params);
+            print(getTopics().getShadowSource(shadowTopic));
+        }
+    }
+
     @Parameters(commandDescription = "Get the delayed delivery policy for a topic")
     private class GetDelayedDelivery extends CliCommand {
         @Parameter(description = "tenant/namespace/topic", required = true)