You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/03 16:29:38 UTC

[GitHub] merlimat closed pull request #1153: Fix increase partitions

merlimat closed pull request #1153: Fix increase partitions
URL: https://github.com/apache/incubator-pulsar/pull/1153
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index da8bb0aba..a3b246e01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -24,6 +24,7 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -77,6 +78,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.MessageId;
@@ -1504,78 +1506,43 @@ private PersistentReplicator getReplicatorReference(String replName, PersistentT
                 return;
             }
 
-            // get list of cursors name of partition-1
-            final String ledgerName = dn.getPartition(1).getPersistenceNamingEncoding();
-            final Set<Topic> topics = Sets.newConcurrentHashSet();
-            ((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(ledgerName,
-                    new MetaStoreCallback<List<String>>() {
-
-                        @Override
-                        public void operationComplete(List<String> cursors,
-                                org.apache.bookkeeper.mledger.impl.MetaStore.Stat stat) {
-                            List<CompletableFuture<Void>> subscriptionCreationFuture = Lists.newArrayList();
-                            // create subscriptions for all new partition-topics
-                            cursors.forEach(cursor -> {
-                                String subName = Codec.decode(cursor);
-                                for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
-                                    final String topicName = dn.getPartition(i).toString();
-                                    CompletableFuture<Void> future = new CompletableFuture<>();
-                                    pulsar().getBrokerService().getTopic(topicName).handle((topic, ex) -> {
-                                        // cache topic to close all of them after creating all subscriptions
-                                        topics.add(topic);
-                                        if (ex != null) {
-                                            log.warn("[{}] Failed to create topic {}", clientAppId(), topicName);
-                                            future.completeExceptionally(ex);
-                                            return null;
-                                        } else {
-                                            topic.createSubscription(subName).handle((sub, e) -> {
-                                                if (e != null) {
-                                                    log.warn("[{}] Failed to create subsciption {} {}", clientAppId(),
-                                                            topicName, subName);
-                                                    future.completeExceptionally(e);
-                                                    return null;
-                                                } else {
-                                                    log.info("[{}] Successfully created subsciption {} {}",
-                                                            clientAppId(), topicName, subName);
-                                                    future.complete(null);
-                                                    return null;
-                                                }
-                                            });
-                                            return null;
-                                        }
-                                    });
-                                    subscriptionCreationFuture.add(future);
-                                }
-                            });
-                            // wait for all subscriptions to be created
-                            FutureUtil.waitForAll(subscriptionCreationFuture).handle((res, subscriptionException) -> {
-                                // close all topics and then complete result future
-                                FutureUtil.waitForAll(
-                                        topics.stream().map(topic -> topic.close()).collect(Collectors.toList()))
-                                        .handle((closed, topicCloseException) -> {
-                                            if (topicCloseException != null) {
-                                                log.warn("Failed to close newly created partitioned topics for {} ", dn,
-                                                        topicCloseException);
-                                            }
-                                            if (subscriptionException != null) {
-                                                result.completeExceptionally(subscriptionException);
-                                            } else {
-                                                log.info("[{}] Successfully created new partitions {}", clientAppId(),
-                                                        dn.toString());
-                                                result.complete(null);
-                                            }
-                                            return null;
-                                        });
-                                return null;
-                            });
-                        }
+            PulsarAdmin admin;
+            try {
+                admin = pulsar().getAdminClient();
+            } catch (PulsarServerException e1) {
+                result.completeExceptionally(e1);
+                return;
+            }
 
-                        @Override
-                        public void operationFailed(MetaStoreException ex) {
-                            log.warn("[{}] Failed to get list of cursors of {}", clientAppId(), ledgerName);
-                            result.completeExceptionally(ex);
-                        }
+            admin.persistentTopics().getStatsAsync(dn.getPartition(0).toString()).thenAccept(stats -> {
+                stats.subscriptions.keySet().forEach(subscription -> {
+                    List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
+                    for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
+                        final String topicName = dn.getPartition(i).toString();
+
+                        subscriptionFutures.add(admin.persistentTopics().createSubscriptionAsync(topicName,
+                                subscription, MessageId.latest));
+                    }
+
+                    FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
+                        log.info("[{}] Successfully created new partitions {}", clientAppId(), dn);
+                        result.complete(null);
+                    }).exceptionally(ex -> {
+                        log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), dn, ex);
+                        result.completeExceptionally(ex);
+                        return null;
                     });
+                });
+            }).exceptionally(ex -> {
+                if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
+                    // The first partition doesn't exist, so there are currently to subscriptions to recreate
+                    result.complete(null);
+                } else {
+                    log.warn("[{}] Failed to get list of subscriptions of {}", clientAppId(), dn.getPartition(0), ex);
+                    result.completeExceptionally(ex);
+                }
+                return null;
+            });
         }).exceptionally(ex -> {
             log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), dn.toString());
             result.completeExceptionally(ex);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index f8affa5d5..bbbb42e90 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -126,13 +126,13 @@ public void cleanup() throws Exception {
      * 1. create a partitioned-topic
      * 2. update partitions with larger number of partitions
      * 3. verify: getPartitionedMetadata and check number of partitions
-     * 4. verify: this api creates existing subscription to new partitioned-topics 
-     *            so, message will not be lost in new partitions 
+     * 4. verify: this api creates existing subscription to new partitioned-topics
+     *            so, message will not be lost in new partitions
      *  a. start producer and produce messages
      *  b. check existing subscription for new topics and it should have backlog msgs
-     * 
+     *
      * </pre>
-     * 
+     *
      * @param topicName
      * @throws Exception
      */
@@ -224,6 +224,7 @@ public void testIncrementPartitionsOfTopic() throws Exception {
         consumer2.close();
     }
 
+
     /**
      * verifies admin api command for non-persistent topic.
      * It verifies: partitioned-topic, stats
@@ -280,10 +281,10 @@ private void publishMessagesOnTopic(String topicName, int messages, int startIdx
 
         producer.close();
     }
-    
+
     /**
      * verifies validation on persistent-policies
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -321,7 +322,7 @@ public void testSetPersistencepolicies() throws Exception {
 
     /**
      * validates update of persistent-policies reflects on managed-ledger and managed-cursor
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -360,7 +361,7 @@ public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception {
 
     /**
      * Verify unloading topic
-     * 
+     *
      * @throws Exception
      */
     @Test(dataProvider = "topicType")
@@ -412,14 +413,14 @@ private void unloadTopic(String topicName, boolean isPersistentTopic) throws Exc
 
     /**
      * Verifies reset-cursor at specific position using admin-api.
-     * 
+     *
      * <pre>
      * 1. Publish 50 messages
      * 2. Consume 20 messages
      * 3. reset cursor position on 10th message
      * 4. consume 40 messages from reset position
      * </pre>
-     * 
+     *
      * @param namespaceName
      * @throws Exception
      */
@@ -519,7 +520,7 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages, in
 
     /**
      * It verifies that pulsar with different load-manager generates different load-report and returned by admin-api
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -589,7 +590,7 @@ public void testPeerCluster() throws Exception {
 
     /**
      * It validates that peer-cluster can't coexist in replication-cluster list
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -709,14 +710,14 @@ public void namespaceAntiAffinity() throws PulsarAdminException {
         List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces("dummy", "use", "invalid-group");
         assertEquals(namespaces2.size(), 0);
     }
-    
+
     @Test
     public void testNonPersistentTopics() throws Exception {
         final String namespace = "prop-xyz/use/ns2";
         final String topicName = "non-persistent://" + namespace + "/topic";
         admin.namespaces().createNamespace(namespace, 20);
         int totalTopics = 100;
-        
+
         Set<String> topicNames = Sets.newHashSet();
         for (int i = 0; i < totalTopics; i++) {
             topicNames.add(topicName + i);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
new file mode 100644
index 000000000..779466a64
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest {
+
+    private MockedPulsarService mockPulsarSetup;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setLoadBalancerEnabled(true);
+        super.internalSetup();
+
+        // create otherbroker to test redirect on calls that need
+        // namespace ownership
+        mockPulsarSetup = new MockedPulsarService(this.conf);
+        mockPulsarSetup.setup();
+
+        // Setup namespaces
+        admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
+        PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"), Sets.newHashSet("use"));
+        admin.properties().createProperty("prop-xyz", propertyAdmin);
+        admin.namespaces().createNamespace("prop-xyz/use/ns1");
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+        mockPulsarSetup.cleanup();
+    }
+
+    @Test
+    public void testIncrementPartitionsOfTopicOnUnusedTopic() throws Exception {
+        final String partitionedTopicName = "persistent://prop-xyz/use/ns1/test-topic";
+
+        admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 10);
+        assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10);
+
+        admin.persistentTopics().updatePartitionedTopic(partitionedTopicName, 20);
+        assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
+    }
+
+    @Test
+    public void testIncrementPartitionsOfTopic() throws Exception {
+        final String partitionedTopicName = "persistent://prop-xyz/use/ns1/test-topic-2";
+
+        admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 10);
+        assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10);
+
+        Consumer consumer = pulsarClient.subscribe(partitionedTopicName, "sub-1");
+
+        admin.persistentTopics().updatePartitionedTopic(partitionedTopicName, 20);
+        assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
+
+        assertEquals(
+                admin.persistentTopics()
+                        .getSubscriptions(DestinationName.get(partitionedTopicName).getPartition(15).toString()),
+                Lists.newArrayList("sub-1"));
+
+        consumer.close();
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services