You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/29 08:01:08 UTC

[pulsar] branch master updated: [Improve][txn] Add admin api updateTransactionCoordinatorNumber (#15296)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7164561dfc5 [Improve][txn] Add admin api updateTransactionCoordinatorNumber (#15296)
7164561dfc5 is described below

commit 7164561dfc50cefb3f13e7ab62f3408e6066c059
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Apr 29 16:01:00 2022 +0800

    [Improve][txn] Add admin api updateTransactionCoordinatorNumber (#15296)
---
 .../pulsar/broker/admin/impl/TransactionsBase.java | 14 +++++++
 .../pulsar/broker/admin/v3/Transactions.java       | 25 ++++++++++++
 .../pulsar/broker/admin/AdminApiTlsAuthTest.java   | 20 +++++++++
 .../broker/admin/v3/AdminApiTransactionTest.java   | 47 ++++++++++++++++++++++
 .../apache/pulsar/client/admin/Transactions.java   | 15 +++++++
 .../client/admin/internal/TransactionsImpl.java    | 16 ++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  4 ++
 .../apache/pulsar/admin/cli/CmdTransactions.java   | 12 ++++++
 8 files changed, 153 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index bbd2e0be9c3..f1631e0751a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
@@ -432,4 +433,17 @@ public abstract class TransactionsBase extends AdminResource {
             throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic name is not valid");
         }
     }
+
+    protected CompletableFuture<Void> internalScaleTransactionCoordinators(int replicas) {
+        return validateSuperUserAccessAsync()
+                .thenCompose((ignore) -> namespaceResources().getPartitionedTopicResources()
+                        .updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p -> {
+                            if (p.partitions >= replicas) {
+                                throw new RestException(Response.Status.NOT_ACCEPTABLE,
+                                        "Number of transaction coordinators should "
+                                                + "be more than the current number of transaction coordinator");
+                            }
+                            return new PartitionedTopicMetadata(replicas);
+                        }));
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 7b2c99ff9d5..48499134361 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -29,6 +29,7 @@ import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
@@ -36,6 +37,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.impl.TransactionsBase;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -314,4 +316,27 @@ public class Transactions extends TransactionsBase {
             resumeAsyncResponseExceptionally(asyncResponse, ex);
         }
     }
+
+    @POST
+    @Path("/transactionCoordinator/replicas")
+    @ApiResponses(value = {
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 406, message = "The number of replicas should be more than "
+                    + "the current number of transaction coordinator replicas"),
+            @ApiResponse(code = 401, message = "This operation requires super-user access")})
+    public void scaleTransactionCoordinators(@Suspended final AsyncResponse asyncResponse, int replicas) {
+        try {
+            checkTransactionCoordinatorEnabled();
+            internalScaleTransactionCoordinators(replicas)
+                    .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+                    .exceptionally(e -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, e);
+                        return null;
+                    });
+        } catch (Exception e) {
+            log.warn("{} Failed to update the scale of transaction coordinators", clientAppId());
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
index 311e31be735..fcf68c907bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
@@ -48,6 +48,8 @@ import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ResourceGroup;
 import org.apache.pulsar.common.tls.NoopHostnameVerifier;
@@ -195,6 +197,24 @@ public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testSuperUserCanUpdateScaleOfTransactionCoordinators() throws Exception {
+        getPulsar().getConfiguration().setTransactionCoordinatorEnabled(true);
+        pulsar.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(3));
+        PulsarAdmin admin = buildAdminClient("admin");
+        admin.transactions().scaleTransactionCoordinators(4);
+        int partitions = pulsar.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                .getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)
+                .get().get().partitions;
+        Assert.assertEquals(partitions, 4);
+    }
+
     @Test
     public void testProxyRoleCantDeleteResourceGroups() throws Exception {
         try (PulsarAdmin admin = buildAdminClient("admin")) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index d46b640aac3..34492b3ca34 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.admin.v3;
 
 import com.google.common.collect.Sets;
+import java.util.Set;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.http.HttpStatus;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -60,6 +61,9 @@ import org.testng.annotations.Test;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -519,6 +523,49 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         } catch (PulsarAdminException ex) {
             assertEquals(ex.getStatusCode(), HttpStatus.SC_SERVICE_UNAVAILABLE);
         }
+        try {
+            admin.transactions().scaleTransactionCoordinators(1);
+        } catch (PulsarAdminException ex) {
+            assertEquals(ex.getStatusCode(), HttpStatus.SC_SERVICE_UNAVAILABLE);
+        }
+    }
+
+    @Test
+    public void testUpdateTransactionCoordinatorNumber() throws Exception {
+        int coordinatorSize = 3;
+        pulsar.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(coordinatorSize));
+        try {
+            admin.transactions().scaleTransactionCoordinators(coordinatorSize - 1);
+            fail();
+        } catch (PulsarAdminException pulsarAdminException) {
+            assertEquals(pulsarAdminException.getStatusCode(), HttpStatus.SC_NOT_ACCEPTABLE);
+        }
+        try {
+            admin.transactions().scaleTransactionCoordinators(-1);
+            fail();
+        } catch (PulsarAdminException pulsarAdminException) {
+            assertEquals(pulsarAdminException.getCause().getMessage(),
+                    "Number of transaction coordinators must be more than 0");
+        }
+
+        admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2);
+        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+        pulsarClient.close();
+        Awaitility.await().until(() -> pulsar.getTransactionMetadataStoreService().getStores().size() ==
+                        coordinatorSize * 2);
+        pulsar.getConfiguration().setAuthenticationEnabled(true);
+        Set<String> proxyRoles = spy(Set.class);
+        doReturn(true).when(proxyRoles).contains(any());
+        pulsar.getConfiguration().setProxyRoles(proxyRoles);
+        try {
+            admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2 + 1);
+            fail();
+        } catch (PulsarAdminException.NotAuthorizedException ignored) {
+        }
     }
 
     private static void verifyCoordinatorStats(String state,
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index b082f1c1780..803f50f8bdd 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -243,4 +243,19 @@ public interface Transactions {
     TransactionPendingAckInternalStats getPendingAckInternalStats(String topic, String subName,
                                                                   boolean metadata) throws PulsarAdminException;
 
+    /**
+     * Sets the scale of the transaction coordinators.
+     * And currently, we can only support scale-up.
+     * @param replicas the new transaction coordinators size.
+     */
+    void scaleTransactionCoordinators(int replicas) throws PulsarAdminException;
+
+    /**
+     * Asynchronously sets the size of the transaction coordinators.
+     * And currently, we can only support scale-up.
+     * @param replicas the new transaction coordinators size.
+     * @return a future that can be used to track when the transaction coordinator number is updated.
+     */
+    CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas);
+
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 46262d20531..d31ae9cfeaf 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Transactions;
 import org.apache.pulsar.client.api.Authentication;
@@ -335,4 +338,17 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         return sync(() -> getPendingAckInternalStatsAsync(topic, subName, metadata));
     }
 
+    @Override
+    public void scaleTransactionCoordinators(int replicas) throws PulsarAdminException {
+         sync(() -> scaleTransactionCoordinatorsAsync(replicas));
+    }
+
+    @Override
+    public CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas) {
+        checkArgument(replicas > 0, "Number of transaction coordinators must be more than 0");
+        WebTarget path = adminV3Transactions.path("transactionCoordinator");
+        path = path.path("replicas");
+        return asyncPostRequest(path, Entity.entity(replicas, MediaType.APPLICATION_JSON));
+    }
+
 }
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index c23d6f47cd5..299ee713c68 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2056,6 +2056,10 @@ public class PulsarAdminToolTest {
         cmdTransactions = new CmdTransactions(() -> admin);
         cmdTransactions.run(split("pending-ack-internal-stats -t test -s test"));
         verify(transactions).getPendingAckInternalStats("test", "test", false);
+
+        cmdTransactions = new CmdTransactions(() -> admin);
+        cmdTransactions.run(split("scale-transactionCoordinators -r 3"));
+        verify(transactions).scaleTransactionCoordinators(3);
     }
 
     @Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index e6953817b0d..f6d11e5b4a8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -178,6 +178,17 @@ public class CmdTransactions extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Update the scale of transaction coordinators")
+    private class ScaleTransactionCoordinators extends CliCommand {
+        @Parameter(names = { "-r", "--replicas" }, description = "The scale of the transaction coordinators")
+        private int replicas;
+        @Override
+        void run() throws Exception {
+            getAdmin().transactions().scaleTransactionCoordinators(replicas);
+        }
+    }
+
+
     public CmdTransactions(Supplier<PulsarAdmin> admin) {
         super("transactions", admin);
         jcommander.addCommand("coordinator-internal-stats", new GetCoordinatorInternalStats());
@@ -189,5 +200,6 @@ public class CmdTransactions extends CmdBase {
         jcommander.addCommand("transaction-in-pending-ack-stats", new GetTransactionInPendingAckStats());
         jcommander.addCommand("transaction-metadata", new GetTransactionMetadata());
         jcommander.addCommand("slow-transactions", new GetSlowTransactions());
+        jcommander.addCommand("scale-transactionCoordinators", new ScaleTransactionCoordinators());
     }
 }