You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/26 03:35:07 UTC

[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17711: [feat][broker]PIP-180 Shadow Topic - Part V - Support shadow topic creation.

mattisonchao commented on code in PR #17711:
URL: https://github.com/apache/pulsar/pull/17711#discussion_r979530556


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -148,6 +149,33 @@ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map
                 });
     }
 
+    public CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name) {
+        CompletableFuture<Map<String, String>> result = new CompletableFuture<>();
+        getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
+                HashMap<String, String> propertiesMap = new HashMap<>();

Review Comment:
   It's better to give map an init value, `mlInfo.getPropertiesCount()`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1408,14 +1408,38 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
         return topicFuture;
     }
 
+    CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(TopicName topicName) {
+        if (!topicName.isPartitioned()) {
+            return managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding());
+        } else {
+            TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
+            return fetchPartitionedTopicMetadataAsync(partitionedTopicName)
+                    .thenCompose(metadata -> {
+                        if (metadata.partitions == 0) {
+                            return managedLedgerFactory.getManagedLedgerPropertiesAsync(
+                                    topicName.getPersistenceNamingEncoding());
+                        }
+                        return CompletableFuture.completedFuture(metadata.properties);
+                    });
+        }
+    }
+
     private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> topicFuture,
                                        Map<String, String> properties) {
         TopicName topicName = TopicName.get(topic);
         pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
                 .thenAccept(isActive -> {
                     if (isActive) {
-                        createPersistentTopic(topic, createIfMissing, topicFuture, properties);
+                        CompletableFuture<Map<String, String>> propertiesFuture;
+                        if (properties == null) {
+                            //Read properties from storage when loading topic.
+                            propertiesFuture = fetchTopicPropertiesAsync(topicName);
+                        } else {
+                            propertiesFuture = CompletableFuture.completedFuture(properties);
+                        }
+                        propertiesFuture.thenAccept(finalProperties->

Review Comment:
   what about `propertiesFuture` got an exception?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -177,6 +178,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator> shadowReplicators;
     @Getter
     private volatile List<String> shadowTopics;
+    @Getter

Review Comment:
   I'd like to add get method return Optional to avoid potential NPE. :)



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -148,6 +149,33 @@ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map
                 });
     }
 
+    public CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name) {
+        CompletableFuture<Map<String, String>> result = new CompletableFuture<>();
+        getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
+                HashMap<String, String> propertiesMap = new HashMap<>();
+                if (mlInfo.getPropertiesCount() > 0) {
+                    for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
+                        MLDataFormats.KeyValue property = mlInfo.getProperties(i);
+                        propertiesMap.put(property.getKey(), property.getValue());
+                    }
+                }
+                result.complete(propertiesMap);
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                if (e instanceof MetadataNotFoundException) {
+                    result.complete(null);

Review Comment:
   If we don't want to return the `MetadataNotFoundException` exception out, we can give it the empty map, which will avoid NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org