You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/29 08:01:08 UTC
[pulsar] branch master updated: [Improve][txn] Add admin api updateTransactionCoordinatorNumber (#15296)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7164561dfc5 [Improve][txn] Add admin api updateTransactionCoordinatorNumber (#15296)
7164561dfc5 is described below
commit 7164561dfc50cefb3f13e7ab62f3408e6066c059
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Apr 29 16:01:00 2022 +0800
[Improve][txn] Add admin api updateTransactionCoordinatorNumber (#15296)
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 14 +++++++
.../pulsar/broker/admin/v3/Transactions.java | 25 ++++++++++++
.../pulsar/broker/admin/AdminApiTlsAuthTest.java | 20 +++++++++
.../broker/admin/v3/AdminApiTransactionTest.java | 47 ++++++++++++++++++++++
.../apache/pulsar/client/admin/Transactions.java | 15 +++++++
.../client/admin/internal/TransactionsImpl.java | 16 ++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 ++
.../apache/pulsar/admin/cli/CmdTransactions.java | 12 ++++++
8 files changed, 153 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index bbd2e0be9c3..f1631e0751a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
@@ -432,4 +433,17 @@ public abstract class TransactionsBase extends AdminResource {
throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic name is not valid");
}
}
+
+ protected CompletableFuture<Void> internalScaleTransactionCoordinators(int replicas) {
+ return validateSuperUserAccessAsync()
+ .thenCompose((ignore) -> namespaceResources().getPartitionedTopicResources()
+ .updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p -> {
+ if (p.partitions >= replicas) {
+ throw new RestException(Response.Status.NOT_ACCEPTABLE,
+ "Number of transaction coordinators should "
+ + "be more than the current number of transaction coordinator");
+ }
+ return new PartitionedTopicMetadata(replicas);
+ }));
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 7b2c99ff9d5..48499134361 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -29,6 +29,7 @@ import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@@ -36,6 +37,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.impl.TransactionsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -314,4 +316,27 @@ public class Transactions extends TransactionsBase {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
}
+
+ @POST
+ @Path("/transactionCoordinator/replicas")
+ @ApiResponses(value = {
+ @ApiResponse(code = 503, message = "This Broker is not configured "
+ + "with transactionCoordinatorEnabled=true."),
+ @ApiResponse(code = 406, message = "The number of replicas should be more than "
+ + "the current number of transaction coordinator replicas"),
+ @ApiResponse(code = 401, message = "This operation requires super-user access")})
+ public void scaleTransactionCoordinators(@Suspended final AsyncResponse asyncResponse, int replicas) {
+ try {
+ checkTransactionCoordinatorEnabled();
+ internalScaleTransactionCoordinators(replicas)
+ .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(e -> {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
+ } catch (Exception e) {
+ log.warn("{} Failed to update the scale of transaction coordinators", clientAppId());
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
index 311e31be735..fcf68c907bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
@@ -48,6 +48,8 @@ import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.tls.NoopHostnameVerifier;
@@ -195,6 +197,24 @@ public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testSuperUserCanUpdateScaleOfTransactionCoordinators() throws Exception {
+ getPulsar().getConfiguration().setTransactionCoordinatorEnabled(true);
+ pulsar.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+ .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(3));
+ PulsarAdmin admin = buildAdminClient("admin");
+ admin.transactions().scaleTransactionCoordinators(4);
+ int partitions = pulsar.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+ .getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)
+ .get().get().partitions;
+ Assert.assertEquals(partitions, 4);
+ }
+
@Test
public void testProxyRoleCantDeleteResourceGroups() throws Exception {
try (PulsarAdmin admin = buildAdminClient("admin")) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index d46b640aac3..34492b3ca34 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin.v3;
import com.google.common.collect.Sets;
+import java.util.Set;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -60,6 +61,9 @@ import org.testng.annotations.Test;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -519,6 +523,49 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
} catch (PulsarAdminException ex) {
assertEquals(ex.getStatusCode(), HttpStatus.SC_SERVICE_UNAVAILABLE);
}
+ try {
+ admin.transactions().scaleTransactionCoordinators(1);
+ } catch (PulsarAdminException ex) {
+ assertEquals(ex.getStatusCode(), HttpStatus.SC_SERVICE_UNAVAILABLE);
+ }
+ }
+
+ @Test
+ public void testUpdateTransactionCoordinatorNumber() throws Exception {
+ int coordinatorSize = 3;
+ pulsar.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+ .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(coordinatorSize));
+ try {
+ admin.transactions().scaleTransactionCoordinators(coordinatorSize - 1);
+ fail();
+ } catch (PulsarAdminException pulsarAdminException) {
+ assertEquals(pulsarAdminException.getStatusCode(), HttpStatus.SC_NOT_ACCEPTABLE);
+ }
+ try {
+ admin.transactions().scaleTransactionCoordinators(-1);
+ fail();
+ } catch (PulsarAdminException pulsarAdminException) {
+ assertEquals(pulsarAdminException.getCause().getMessage(),
+ "Number of transaction coordinators must be more than 0");
+ }
+
+ admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2);
+ pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+ pulsarClient.close();
+ Awaitility.await().until(() -> pulsar.getTransactionMetadataStoreService().getStores().size() ==
+ coordinatorSize * 2);
+ pulsar.getConfiguration().setAuthenticationEnabled(true);
+ Set<String> proxyRoles = spy(Set.class);
+ doReturn(true).when(proxyRoles).contains(any());
+ pulsar.getConfiguration().setProxyRoles(proxyRoles);
+ try {
+ admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2 + 1);
+ fail();
+ } catch (PulsarAdminException.NotAuthorizedException ignored) {
+ }
}
private static void verifyCoordinatorStats(String state,
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index b082f1c1780..803f50f8bdd 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -243,4 +243,19 @@ public interface Transactions {
TransactionPendingAckInternalStats getPendingAckInternalStats(String topic, String subName,
boolean metadata) throws PulsarAdminException;
+ /**
+ * Sets the scale of the transaction coordinators.
+ * And currently, we can only support scale-up.
+ * @param replicas the new transaction coordinators size.
+ */
+ void scaleTransactionCoordinators(int replicas) throws PulsarAdminException;
+
+ /**
+ * Asynchronously sets the size of the transaction coordinators.
+ * And currently, we can only support scale-up.
+ * @param replicas the new transaction coordinators size.
+ * @return a future that can be used to track when the transaction coordinator number is updated.
+ */
+ CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas);
+
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 46262d20531..d31ae9cfeaf 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -18,11 +18,14 @@
*/
package org.apache.pulsar.client.admin.internal;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.Authentication;
@@ -335,4 +338,17 @@ public class TransactionsImpl extends BaseResource implements Transactions {
return sync(() -> getPendingAckInternalStatsAsync(topic, subName, metadata));
}
+ @Override
+ public void scaleTransactionCoordinators(int replicas) throws PulsarAdminException {
+ sync(() -> scaleTransactionCoordinatorsAsync(replicas));
+ }
+
+ @Override
+ public CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas) {
+ checkArgument(replicas > 0, "Number of transaction coordinators must be more than 0");
+ WebTarget path = adminV3Transactions.path("transactionCoordinator");
+ path = path.path("replicas");
+ return asyncPostRequest(path, Entity.entity(replicas, MediaType.APPLICATION_JSON));
+ }
+
}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index c23d6f47cd5..299ee713c68 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2056,6 +2056,10 @@ public class PulsarAdminToolTest {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("pending-ack-internal-stats -t test -s test"));
verify(transactions).getPendingAckInternalStats("test", "test", false);
+
+ cmdTransactions = new CmdTransactions(() -> admin);
+ cmdTransactions.run(split("scale-transactionCoordinators -r 3"));
+ verify(transactions).scaleTransactionCoordinators(3);
}
@Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index e6953817b0d..f6d11e5b4a8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -178,6 +178,17 @@ public class CmdTransactions extends CmdBase {
}
}
+ @Parameters(commandDescription = "Update the scale of transaction coordinators")
+ private class ScaleTransactionCoordinators extends CliCommand {
+ @Parameter(names = { "-r", "--replicas" }, description = "The scale of the transaction coordinators")
+ private int replicas;
+ @Override
+ void run() throws Exception {
+ getAdmin().transactions().scaleTransactionCoordinators(replicas);
+ }
+ }
+
+
public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
jcommander.addCommand("coordinator-internal-stats", new GetCoordinatorInternalStats());
@@ -189,5 +200,6 @@ public class CmdTransactions extends CmdBase {
jcommander.addCommand("transaction-in-pending-ack-stats", new GetTransactionInPendingAckStats());
jcommander.addCommand("transaction-metadata", new GetTransactionMetadata());
jcommander.addCommand("slow-transactions", new GetSlowTransactions());
+ jcommander.addCommand("scale-transactionCoordinators", new ScaleTransactionCoordinators());
}
}