You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/03 16:29:36 UTC
[incubator-pulsar] branch master updated: Fixed increment
partitions operation (#1153)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d482c05 Fixed increment partitions operation (#1153)
d482c05 is described below
commit d482c0518f06204079024d2890a7fda3ac1b7050
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Feb 3 08:29:33 2018 -0800
Fixed increment partitions operation (#1153)
---
.../pulsar/broker/admin/PersistentTopics.java | 107 +++++++--------------
.../apache/pulsar/broker/admin/AdminApiTest2.java | 29 +++---
.../broker/admin/IncrementPartitionsTest.java | 95 ++++++++++++++++++
3 files changed, 147 insertions(+), 84 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index da8bb0a..a3b246e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -24,6 +24,7 @@ import static org.apache.pulsar.common.util.Codec.decode;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -77,6 +78,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageId;
@@ -1504,78 +1506,43 @@ public class PersistentTopics extends AdminResource {
return;
}
- // get list of cursors name of partition-1
- final String ledgerName = dn.getPartition(1).getPersistenceNamingEncoding();
- final Set<Topic> topics = Sets.newConcurrentHashSet();
- ((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(ledgerName,
- new MetaStoreCallback<List<String>>() {
-
- @Override
- public void operationComplete(List<String> cursors,
- org.apache.bookkeeper.mledger.impl.MetaStore.Stat stat) {
- List<CompletableFuture<Void>> subscriptionCreationFuture = Lists.newArrayList();
- // create subscriptions for all new partition-topics
- cursors.forEach(cursor -> {
- String subName = Codec.decode(cursor);
- for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
- final String topicName = dn.getPartition(i).toString();
- CompletableFuture<Void> future = new CompletableFuture<>();
- pulsar().getBrokerService().getTopic(topicName).handle((topic, ex) -> {
- // cache topic to close all of them after creating all subscriptions
- topics.add(topic);
- if (ex != null) {
- log.warn("[{}] Failed to create topic {}", clientAppId(), topicName);
- future.completeExceptionally(ex);
- return null;
- } else {
- topic.createSubscription(subName).handle((sub, e) -> {
- if (e != null) {
- log.warn("[{}] Failed to create subsciption {} {}", clientAppId(),
- topicName, subName);
- future.completeExceptionally(e);
- return null;
- } else {
- log.info("[{}] Successfully created subsciption {} {}",
- clientAppId(), topicName, subName);
- future.complete(null);
- return null;
- }
- });
- return null;
- }
- });
- subscriptionCreationFuture.add(future);
- }
- });
- // wait for all subscriptions to be created
- FutureUtil.waitForAll(subscriptionCreationFuture).handle((res, subscriptionException) -> {
- // close all topics and then complete result future
- FutureUtil.waitForAll(
- topics.stream().map(topic -> topic.close()).collect(Collectors.toList()))
- .handle((closed, topicCloseException) -> {
- if (topicCloseException != null) {
- log.warn("Failed to close newly created partitioned topics for {} ", dn,
- topicCloseException);
- }
- if (subscriptionException != null) {
- result.completeExceptionally(subscriptionException);
- } else {
- log.info("[{}] Successfully created new partitions {}", clientAppId(),
- dn.toString());
- result.complete(null);
- }
- return null;
- });
- return null;
- });
- }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (PulsarServerException e1) {
+ result.completeExceptionally(e1);
+ return;
+ }
- @Override
- public void operationFailed(MetaStoreException ex) {
- log.warn("[{}] Failed to get list of cursors of {}", clientAppId(), ledgerName);
- result.completeExceptionally(ex);
- }
+ admin.persistentTopics().getStatsAsync(dn.getPartition(0).toString()).thenAccept(stats -> {
+ stats.subscriptions.keySet().forEach(subscription -> {
+ List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
+ for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
+ final String topicName = dn.getPartition(i).toString();
+
+ subscriptionFutures.add(admin.persistentTopics().createSubscriptionAsync(topicName,
+ subscription, MessageId.latest));
+ }
+
+ FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
+ log.info("[{}] Successfully created new partitions {}", clientAppId(), dn);
+ result.complete(null);
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), dn, ex);
+ result.completeExceptionally(ex);
+ return null;
});
+ });
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
+ // The first partition doesn't exist, so there are currently to subscriptions to recreate
+ result.complete(null);
+ } else {
+ log.warn("[{}] Failed to get list of subscriptions of {}", clientAppId(), dn.getPartition(0), ex);
+ result.completeExceptionally(ex);
+ }
+ return null;
+ });
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), dn.toString());
result.completeExceptionally(ex);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index f8affa5..bbbb42e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -126,13 +126,13 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
* 1. create a partitioned-topic
* 2. update partitions with larger number of partitions
* 3. verify: getPartitionedMetadata and check number of partitions
- * 4. verify: this api creates existing subscription to new partitioned-topics
- * so, message will not be lost in new partitions
+ * 4. verify: this api creates existing subscription to new partitioned-topics
+ * so, message will not be lost in new partitions
* a. start producer and produce messages
* b. check existing subscription for new topics and it should have backlog msgs
- *
+ *
* </pre>
- *
+ *
* @param topicName
* @throws Exception
*/
@@ -224,6 +224,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
consumer2.close();
}
+
/**
* verifies admin api command for non-persistent topic.
* It verifies: partitioned-topic, stats
@@ -280,10 +281,10 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
producer.close();
}
-
+
/**
* verifies validation on persistent-policies
- *
+ *
* @throws Exception
*/
@Test
@@ -321,7 +322,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
/**
* validates update of persistent-policies reflects on managed-ledger and managed-cursor
- *
+ *
* @throws Exception
*/
@Test
@@ -360,7 +361,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
/**
* Verify unloading topic
- *
+ *
* @throws Exception
*/
@Test(dataProvider = "topicType")
@@ -412,14 +413,14 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
/**
* Verifies reset-cursor at specific position using admin-api.
- *
+ *
* <pre>
* 1. Publish 50 messages
* 2. Consume 20 messages
* 3. reset cursor position on 10th message
* 4. consume 40 messages from reset position
* </pre>
- *
+ *
* @param namespaceName
* @throws Exception
*/
@@ -519,7 +520,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
/**
* It verifies that pulsar with different load-manager generates different load-report and returned by admin-api
- *
+ *
* @throws Exception
*/
@Test
@@ -589,7 +590,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
/**
* It validates that peer-cluster can't coexist in replication-cluster list
- *
+ *
* @throws Exception
*/
@Test
@@ -709,14 +710,14 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces("dummy", "use", "invalid-group");
assertEquals(namespaces2.size(), 0);
}
-
+
@Test
public void testNonPersistentTopics() throws Exception {
final String namespace = "prop-xyz/use/ns2";
final String topicName = "non-persistent://" + namespace + "/topic";
admin.namespaces().createNamespace(namespace, 20);
int totalTopics = 100;
-
+
Set<String> topicNames = Sets.newHashSet();
for (int i = 0; i < totalTopics; i++) {
topicNames.add(topicName + i);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
new file mode 100644
index 0000000..779466a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest {
+
+ private MockedPulsarService mockPulsarSetup;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ conf.setLoadBalancerEnabled(true);
+ super.internalSetup();
+
+ // create otherbroker to test redirect on calls that need
+ // namespace ownership
+ mockPulsarSetup = new MockedPulsarService(this.conf);
+ mockPulsarSetup.setup();
+
+ // Setup namespaces
+ admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
+ PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"), Sets.newHashSet("use"));
+ admin.properties().createProperty("prop-xyz", propertyAdmin);
+ admin.namespaces().createNamespace("prop-xyz/use/ns1");
+ }
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ mockPulsarSetup.cleanup();
+ }
+
+ @Test
+ public void testIncrementPartitionsOfTopicOnUnusedTopic() throws Exception {
+ final String partitionedTopicName = "persistent://prop-xyz/use/ns1/test-topic";
+
+ admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 10);
+ assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10);
+
+ admin.persistentTopics().updatePartitionedTopic(partitionedTopicName, 20);
+ assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
+ }
+
+ @Test
+ public void testIncrementPartitionsOfTopic() throws Exception {
+ final String partitionedTopicName = "persistent://prop-xyz/use/ns1/test-topic-2";
+
+ admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 10);
+ assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10);
+
+ Consumer consumer = pulsarClient.subscribe(partitionedTopicName, "sub-1");
+
+ admin.persistentTopics().updatePartitionedTopic(partitionedTopicName, 20);
+ assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
+
+ assertEquals(
+ admin.persistentTopics()
+ .getSubscriptions(DestinationName.get(partitionedTopicName).getPartition(15).toString()),
+ Lists.newArrayList("sub-1"));
+
+ consumer.close();
+ }
+}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.