You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 05:48:50 UTC

[pulsar] branch branch-2.9 updated (cb4b004 -> 06a4678)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from cb4b004  [fix][txn]: fix pending ack is recovering throw CursorAlreadyClosedxception (#14781)
     new aa66cb8  Fix normal topic named ends with `healthcheck`  becomes system topic issue. (#14671)
     new e0d69a3  Provide an accurate error message when set ``autoTopicCreation `` (#14684)
     new e0eebaf  Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706)
     new 973eadd  [fix][test] Fix wrong retry behavior in MetadataCacheTest (#14778)
     new d71b003  [fix][broker] Fix cannot delete namespace with system topic (#14730)
     new 862ebc5  [fix][admin-cli]: Remove the trust certs check (#14764)
     new 8a3eccb  [fix][admin] Fix NPE in PulsarAdminBuilder when the service is not set (#14769)
     new 36bbc18  Fix flaky test LeaderElectionTest (#14776)
     new 9ae8fae  Handle kafka sinks that return immutable maps as configs (#14780)
     new 60018ba  [fix][txn]: fix transaction buffer recover reader and writer fail (#14801)
     new 511644b  [fix][txn]: fix some exception handle in transaction buffer (#14808)
     new e397e1e  [fix][txn]: fix cannot enable transaction when is allow auto update schema disabled (#14809)
     new b75ac4b  [improve][tool] Improve transaction perf logs (#14816)
     new 06a4678  [fix][test]: fix flaky test testTransactionBufferRecoverThrowPulsarClientException (#14846)

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  |   7 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  14 +-
 .../pulsar/broker/service/AbstractTopic.java       |   6 +-
 .../pulsar/broker/service/BrokerService.java       |  33 +++-
 .../pulsar/broker/systopic/SystemTopicClient.java  |  25 ---
 .../TransactionBufferSystemTopicClient.java        |  22 ++-
 .../buffer/impl/TopicTransactionBuffer.java        | 196 ++++++++++++---------
 .../TopicTransactionBufferRecoverCallBack.java     |   2 +-
 .../pulsar/broker/web/PulsarWebResource.java       |   7 +-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  45 ++++-
 .../pulsar/broker/service/BrokerServiceTest.java   |  34 ++++
 .../broker/service/InactiveTopicDeleteTest.java    |  65 +++----
 .../NamespaceEventsSystemTopicServiceTest.java     |   7 +-
 .../broker/systopic/SystemTopicClientTest.java     |  47 -----
 .../TopicTransactionBufferRecoverTest.java         |  62 ++++++-
 .../pulsar/broker/transaction/TransactionTest.java |  31 ++++
 .../data/{BundlesData.java => ValidateResult.java} |  27 ++-
 .../data/impl/AutoTopicCreationOverrideImpl.java   |  18 +-
 .../client/admin/internal/PulsarAdminImpl.java     |   3 +
 .../admin/internal/PulsarAdminBuilderImplTest.java |  20 ++-
 .../org/apache/pulsar/admin/cli/CmdClusters.java   |   5 -
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   2 +
 .../client/impl/ConsumerBuilderImplTest.java       |  10 ++
 .../data/AutoTopicCreationOverrideTest.java        |  12 +-
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  28 +--
 .../connect/SchemaedFileStreamSinkConnector.java   |  14 ++
 .../apache/pulsar/metadata/LeaderElectionTest.java |  13 +-
 .../apache/pulsar/metadata/MetadataCacheTest.java  |  32 ++++
 .../pulsar/testclient/PerformanceConsumer.java     |   9 +-
 .../pulsar/testclient/PerformanceProducer.java     |  12 +-
 .../websocket/AbstractWebSocketHandlerTest.java    |   3 +-
 31 files changed, 512 insertions(+), 299 deletions(-)
 delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/SystemTopicClientTest.java
 copy pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/{BundlesData.java => ValidateResult.java} (64%)
 copy pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyTypeTest.java => pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java (62%)

[pulsar] 05/14: [fix][broker] Fix cannot delete namespace with system topic (#14730)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d71b00304ff6073da7423c452e9afa574ce7769f
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Mar 22 19:26:18 2022 +0800

    [fix][broker] Fix cannot delete namespace with system topic (#14730)
    
    (cherry picked from commit 7556c4e0165660e8dbd141c4e93bb9e31a67e6f9)
---
 .../pulsar/broker/web/PulsarWebResource.java       |  7 +++-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 45 +++++++++++++++++++++-
 2 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e3ae827..7b5f455 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -722,7 +722,12 @@ public abstract class PulsarWebResource {
                 .getPoliciesAsync(namespace).thenAccept(policiesResult -> {
             if (policiesResult.isPresent()) {
                 Policies policies = policiesResult.get();
-                if (policies.replication_clusters.isEmpty()) {
+                if (policies.deleted) {
+                    String msg = String.format("Namespace %s is deleted", namespace.toString());
+                    log.warn(msg);
+                    validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED,
+                            "Namespace is deleted"));
+                } else if (policies.replication_clusters.isEmpty()) {
                     String msg = String.format(
                             "Namespace does not have any clusters configured : local_cluster=%s ns=%s",
                             localCluster, namespace.toString());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index bc1ab25..0bf34a16fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -45,7 +45,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.core.Response.Status;
 import lombok.Cleanup;
@@ -82,6 +81,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.BundlesData;
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -1356,6 +1357,48 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
     }
 
+    @Test
+    public void testDeleteNamespaceWithTopicPolicies() throws Exception {
+        stopBroker();
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        setup();
+
+        String tenant = "test-tenant";
+        assertFalse(admin.tenants().getTenants().contains(tenant));
+
+        // create tenant
+        admin.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")));
+        assertTrue(admin.tenants().getTenants().contains(tenant));
+
+        // create namespace2
+        String namespace = tenant + "/test-ns2";
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        // create topic
+        String topic = namespace + "/test-topic2";
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+        producer.send("test".getBytes(StandardCharsets.UTF_8));
+        BacklogQuota backlogQuota = BacklogQuotaImpl
+                .builder()
+                .limitTime(1000)
+                .limitSize(1000)
+                .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                .build();
+        admin.topicPolicies().setBacklogQuota(topic, backlogQuota);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(admin.topicPolicies()
+                    .getBacklogQuotaMap(topic)
+                    .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota);
+        });
+        producer.close();
+        admin.topics().delete(topic);
+        admin.namespaces().deleteNamespace(namespace);
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
+        });
+    }
+
 
     @Test(timeOut = 30000)
     public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminException, InterruptedException {

[pulsar] 06/14: [fix][admin-cli]: Remove the trust certs check (#14764)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 862ebc5724f9cccfa88c34a1b405f2ccb46d6a2e
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Tue Mar 22 00:07:52 2022 +0800

    [fix][admin-cli]: Remove the trust certs check (#14764)
    
    (cherry picked from commit 44f92a8118e3ce0461cd4dc1736860fab77f0cae)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java       | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index eb1ca4f..d43c849 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -76,11 +76,6 @@ public class CmdClusters extends CmdBase {
                             "You must specify tls-trust-store-type, tls-trust-store and tls-trust-store-pwd"
                                     + " when enable tls-enable-keystore");
                 }
-            } else {
-                if (StringUtils.isBlank(clusterData.getBrokerClientTrustCertsFilePath())) {
-                    throw new RuntimeException("You must specify tls-trust-certs-filepath"
-                            + " when tls-enable-keystore is not enable");
-                }
             }
         }
     }

[pulsar] 14/14: [fix][test]: fix flaky test testTransactionBufferRecoverThrowPulsarClientException (#14846)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 06a467825aa1c1ec10518a3300088a1dfc96c594
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Mar 24 22:44:40 2022 +0800

    [fix][test]: fix flaky test testTransactionBufferRecoverThrowPulsarClientException (#14846)
    
    Co-authored-by: congbo <co...@github.com>
    (cherry picked from commit dca5a901528e77f218afb9870b223f06143b055f)
---
 .../TopicTransactionBufferRecoverTest.java         | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 01e03a4..392a21f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -454,7 +454,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
     @Test(timeOut=30000)
     public void testTransactionBufferRecoverThrowPulsarClientException() throws Exception {
-        String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowBrokerMetadataException";
+        String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
         @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
@@ -470,6 +470,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         producer.newMessage(txn).value("test".getBytes()).sendAsync();
         txn.commit().get();
 
+        producer.close();
+
         PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
         TransactionBufferSnapshotService transactionBufferSnapshotService =
@@ -479,6 +481,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
         doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
         doReturn(CompletableFuture.completedFuture(writer)).when(transactionBufferSnapshotService).createWriter(any());
+        doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
+        doReturn(CompletableFuture.completedFuture(null)).when(writer).closeAsync();
         Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
         field.setAccessible(true);
         TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
@@ -487,7 +491,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
         // check reader close topic
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+                transactionBufferSnapshotService, originalTopic, field);
         doReturn(true).when(reader).hasMoreEvents();
 
         // mock create reader fail
@@ -497,7 +501,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+                transactionBufferSnapshotService, originalTopic, field);
         doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
 
         // check create writer fail close topic
@@ -507,7 +511,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
                 .when(transactionBufferSnapshotService).createWriter(any());
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+                transactionBufferSnapshotService, originalTopic, field);
 
     }
 
@@ -515,7 +519,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
                                  TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal,
                                  TransactionBufferSnapshotService transactionBufferSnapshotService,
                                  PersistentTopic originalTopic,
-                                 Field field, Producer<byte[]> producer) throws Exception {
+                                 Field field) throws Exception {
         field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService);
 
         // recover again will throw then close topic
@@ -529,13 +533,19 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
         field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal);
 
-        // topic recover success
         Transaction txn = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
                 .build().get();
 
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(originalTopic.getName())
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
         producer.newMessage(txn).value("test".getBytes()).sendAsync();
         txn.commit().get();
+        producer.close();
     }
 
 }

[pulsar] 02/14: Provide an accurate error message when set ``autoTopicCreation `` (#14684)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e0d69a31b684960dbb56f9b124c6f124ca55a034
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Thu Mar 17 19:47:50 2022 +0800

    Provide an accurate error message when set ``autoTopicCreation `` (#14684)
    
    (cherry picked from commit 50a7e50745e5c26200a4abd2e63e3750e34339fb)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  7 ++--
 .../common/policies/data/ValidateResult.java       | 40 ++++++++++++++++++++++
 .../data/impl/AutoTopicCreationOverrideImpl.java   | 18 +++++-----
 .../data/AutoTopicCreationOverrideTest.java        | 12 +++----
 4 files changed, 61 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 11d7b39..df8fd1c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -94,6 +94,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.ValidateResult;
 import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -803,9 +804,11 @@ public abstract class NamespacesBase extends AdminResource {
         validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
         if (autoTopicCreationOverride != null) {
-            if (!AutoTopicCreationOverrideImpl.isValidOverride(autoTopicCreationOverride)) {
+            ValidateResult validateResult = AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
+            if (!validateResult.isSuccess()) {
                 throw new RestException(Status.PRECONDITION_FAILED,
-                        "Invalid configuration for autoTopicCreationOverride");
+                        "Invalid configuration for autoTopicCreationOverride. the detail is "
+                                + validateResult.getErrorInfo());
             }
             if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
                 throw new RestException(Status.NOT_ACCEPTABLE,
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ValidateResult.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ValidateResult.java
new file mode 100644
index 0000000..1382194
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ValidateResult.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.Getter;
+
+@Getter
+public class ValidateResult {
+    private final boolean success;
+    private final String errorInfo;
+
+    private ValidateResult(boolean success, String errorInfo) {
+        this.success = success;
+        this.errorInfo = errorInfo;
+    }
+
+    public static ValidateResult fail(String errorInfo) {
+        return new ValidateResult(false, errorInfo);
+    }
+
+    public static ValidateResult success() {
+        return new ValidateResult(true, null);
+    }
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java
index 1ce60d1..ba6bc07 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.ValidateResult;
 
 /**
  * Override of autoTopicCreation settings on a namespace level.
@@ -35,28 +36,29 @@ public final class AutoTopicCreationOverrideImpl implements AutoTopicCreationOve
     private String topicType;
     private Integer defaultNumPartitions;
 
-    public static boolean isValidOverride(AutoTopicCreationOverride override) {
+    public static ValidateResult validateOverride(AutoTopicCreationOverride override) {
         if (override == null) {
-            return false;
+            return ValidateResult.fail("[AutoTopicCreationOverride] can not be null");
         }
         if (override.isAllowAutoTopicCreation()) {
             if (!TopicType.isValidTopicType(override.getTopicType())) {
-                return false;
+                return ValidateResult.fail(String.format("Unknown topic type [%s]", override.getTopicType()));
             }
             if (TopicType.PARTITIONED.toString().equals(override.getTopicType())) {
                 if (override.getDefaultNumPartitions() == null) {
-                    return false;
+                    return ValidateResult.fail("[defaultNumPartitions] cannot be null when the type is partitioned.");
                 }
-                if (!(override.getDefaultNumPartitions() > 0)) {
-                    return false;
+                if (override.getDefaultNumPartitions() <= 0) {
+                    return ValidateResult.fail("[defaultNumPartitions] cannot be less than 1 for partition type.");
                 }
             } else if (TopicType.NON_PARTITIONED.toString().equals(override.getTopicType())) {
                 if (override.getDefaultNumPartitions() != null) {
-                    return false;
+                    return ValidateResult.fail("[defaultNumPartitions] is not allowed to be"
+                            + " set when the type is non-partition.");
                 }
             }
         }
-        return true;
+        return ValidateResult.success();
     }
 
     public static AutoTopicCreationOverrideImplBuilder builder() {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java
index 5092d43..66769f0 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java
@@ -32,7 +32,7 @@ public class AutoTopicCreationOverrideTest {
                 .allowAutoTopicCreation(true)
                 .topicType(TopicType.NON_PARTITIONED.toString())
                 .build();
-        assertTrue(AutoTopicCreationOverrideImpl.isValidOverride(override));
+        assertTrue(AutoTopicCreationOverrideImpl.validateOverride(override).isSuccess());
     }
 
     @Test
@@ -42,7 +42,7 @@ public class AutoTopicCreationOverrideTest {
                 .topicType(TopicType.PARTITIONED.toString())
                 .defaultNumPartitions(2)
                 .build();
-        assertTrue(AutoTopicCreationOverrideImpl.isValidOverride(override));
+        assertTrue(AutoTopicCreationOverrideImpl.validateOverride(override).isSuccess());
     }
 
     @Test
@@ -51,7 +51,7 @@ public class AutoTopicCreationOverrideTest {
                 .allowAutoTopicCreation(true)
                 .topicType("aaa")
                 .build();
-        assertFalse(AutoTopicCreationOverrideImpl.isValidOverride(override));
+        assertFalse(AutoTopicCreationOverrideImpl.validateOverride(override).isSuccess());
     }
 
     @Test
@@ -61,7 +61,7 @@ public class AutoTopicCreationOverrideTest {
                 .topicType(TopicType.PARTITIONED.toString())
                 .defaultNumPartitions(0)
                 .build();
-        assertFalse(AutoTopicCreationOverrideImpl.isValidOverride(override));
+        assertFalse(AutoTopicCreationOverrideImpl.validateOverride(override).isSuccess());
     }
 
     @Test
@@ -70,7 +70,7 @@ public class AutoTopicCreationOverrideTest {
                 .allowAutoTopicCreation(true)
                 .topicType(TopicType.PARTITIONED.toString())
                 .build();
-        assertFalse(AutoTopicCreationOverrideImpl.isValidOverride(override));
+        assertFalse(AutoTopicCreationOverrideImpl.validateOverride(override).isSuccess());
     }
 
     @Test
@@ -80,6 +80,6 @@ public class AutoTopicCreationOverrideTest {
                 .topicType(TopicType.NON_PARTITIONED.toString())
                 .defaultNumPartitions(2)
                 .build();
-        assertFalse(AutoTopicCreationOverrideImpl.isValidOverride(override));
+        assertFalse(AutoTopicCreationOverrideImpl.validateOverride(override).isSuccess());
     }
 }

[pulsar] 09/14: Handle kafka sinks that return immutable maps as configs (#14780)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9ae8faed33e9dd3c46ad36f3e7872a6b8a2a62b3
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Mar 22 00:51:39 2022 -0700

    Handle kafka sinks that return immutable maps as configs (#14780)
    
    (cherry picked from commit b56d7318e73fb6915208dbe1223446e759c2ed0b)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 28 +++++++++++++---------
 .../connect/SchemaedFileStreamSinkConnector.java   | 14 +++++++++++
 2 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 268105c..e8165f8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -24,6 +24,18 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -44,17 +56,6 @@ import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
 import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
 
 @Slf4j
@@ -154,6 +155,11 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         Preconditions.checkNotNull(configs);
         Preconditions.checkArgument(configs.size() == 1);
 
+        // configs may contain immutable/unmodifiable maps
+        configs = configs.stream()
+                .map(HashMap::new)
+                .collect(Collectors.toList());
+
         configs.forEach(x -> {
             x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic());
         });
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
index a3cce92..4a78661 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
@@ -22,6 +22,11 @@ package org.apache.pulsar.io.kafka.connect;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.file.FileStreamSinkConnector;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * A FileStreamSinkConnector for testing that writes data other than just a value, i.e.:
  * key, value, key and value schemas.
@@ -31,4 +36,13 @@ public class SchemaedFileStreamSinkConnector extends FileStreamSinkConnector {
     public Class<? extends Task> taskClass() {
         return SchemaedFileStreamSinkTask.class;
     }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        // to test cases when task return immutable maps as configs
+        return super.taskConfigs(maxTasks)
+                .stream()
+                .map(Collections::unmodifiableMap)
+                .collect(Collectors.toList());
+    }
 }

[pulsar] 08/14: Fix flaky test LeaderElectionTest (#14776)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 36bbc18956afe76c025ea626466599f5377106ce
Author: gaozhangmin <ga...@gmail.com>
AuthorDate: Tue Mar 22 12:09:00 2022 +0800

    Fix flaky test LeaderElectionTest (#14776)
    
    (cherry picked from commit 46886b03d0be904ac635d4ea8fa6d74f82b896e5)
---
 .../java/org/apache/pulsar/metadata/LeaderElectionTest.java | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
index 8412c71..c8a6fed 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.metadata;
 
+import static org.apache.pulsar.metadata.MetadataCacheTest.assertEqualsAndRetry;
 import static org.testng.Assert.assertEquals;
 import java.util.EnumSet;
 import java.util.Optional;
@@ -111,21 +112,21 @@ public class LeaderElectionTest extends BaseMetadataStoreTest {
 
         LeaderElectionState les1 = le1.elect("test-1").join();
         assertEquals(les1, LeaderElectionState.Leading);
-        assertEquals(le1.getLeaderValueIfPresent(), Optional.of("test-1"));
+        assertEqualsAndRetry(() -> le1.getLeaderValueIfPresent(), Optional.of("test-1"), Optional.empty());
         assertEquals(le1.getLeaderValue().join(), Optional.of("test-1"));
         assertEquals(n1.poll(3, TimeUnit.SECONDS), LeaderElectionState.Leading);
 
         LeaderElectionState les2 = le2.elect("test-2").join();
         assertEquals(les2, LeaderElectionState.Following);
         assertEquals(le2.getLeaderValue().join(), Optional.of("test-1"));
-        assertEquals(le2.getLeaderValueIfPresent(), Optional.of("test-1"));
+        assertEqualsAndRetry(() -> le2.getLeaderValueIfPresent(), Optional.of("test-1"), Optional.empty());
         assertEquals(n2.poll(3, TimeUnit.SECONDS), LeaderElectionState.Following);
 
         le1.close();
 
         assertEquals(n2.poll(3, TimeUnit.SECONDS), LeaderElectionState.Leading);
         assertEquals(le2.getState(), LeaderElectionState.Leading);
-        assertEquals(le2.getLeaderValueIfPresent(), Optional.of("test-2"));
+        assertEqualsAndRetry(() -> le2.getLeaderValueIfPresent(), Optional.of("test-2"), Optional.empty());
         assertEquals(le2.getLeaderValue().join(), Optional.of("test-2"));
     }
 
@@ -209,7 +210,7 @@ public class LeaderElectionTest extends BaseMetadataStoreTest {
         LeaderElectionState les = le.elect("test-2").join();
         assertEquals(les, LeaderElectionState.Leading);
         assertEquals(le.getLeaderValue().join(), Optional.of("test-2"));
-        assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-2"));
+        assertEqualsAndRetry(() -> le.getLeaderValueIfPresent(), Optional.of("test-2"), Optional.empty());
     }
 
     @Test(dataProvider = "impl")
@@ -239,7 +240,7 @@ public class LeaderElectionTest extends BaseMetadataStoreTest {
         LeaderElectionState les = le.elect("test-1").join();
         assertEquals(les, LeaderElectionState.Leading);
         assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
-        assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+        assertEqualsAndRetry(() -> le.getLeaderValueIfPresent(), Optional.of("test-1"), Optional.empty());
     }
 
 
@@ -275,6 +276,6 @@ public class LeaderElectionTest extends BaseMetadataStoreTest {
         LeaderElectionState les = le.elect("test-2").join();
         assertEquals(les, LeaderElectionState.Following);
         assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
-        assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+        assertEqualsAndRetry(() -> le.getLeaderValueIfPresent(), Optional.of("test-1"), Optional.empty());
     }
 }

[pulsar] 01/14: Fix normal topic named ends with `healthcheck` becomes system topic issue. (#14671)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aa66cb8c129d71fbb610f2a8528c7d81dffbbadb
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Mar 14 21:31:43 2022 +0800

    Fix normal topic named ends with `healthcheck`  becomes system topic issue. (#14671)
    
    (cherry picked from commit 3afd1e673304bb2dfb2c42f74a5895a0aa12cfc1)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  7 +--
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  7 +--
 .../pulsar/broker/service/AbstractTopic.java       |  3 +-
 .../pulsar/broker/service/BrokerService.java       | 33 +++++++++--
 .../pulsar/broker/systopic/SystemTopicClient.java  | 25 ---------
 .../pulsar/broker/service/BrokerServiceTest.java   | 34 +++++++++++
 .../broker/service/InactiveTopicDeleteTest.java    | 65 +++++++++-------------
 .../NamespaceEventsSystemTopicServiceTest.java     |  7 ++-
 .../broker/systopic/SystemTopicClientTest.java     | 47 ----------------
 9 files changed, 98 insertions(+), 130 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 8d7de94..3598e03 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -39,7 +39,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -586,12 +585,12 @@ public abstract class AdminResource extends PulsarWebResource {
             }
 
             // new create check
-            if (maxTopicsPerNamespace > 0 && !SystemTopicClient.isSystemTopic(topicName)) {
+            if (maxTopicsPerNamespace > 0 && !pulsar().getBrokerService().isSystemTopic(topicName)) {
                 List<String> partitionedTopics = getTopicPartitionList(TopicDomain.persistent);
                 // exclude created system topic
                 long topicsCount =
-                        partitionedTopics.stream().filter(t -> !SystemTopicClient.isSystemTopic(TopicName.get(t)))
-                                .count();
+                        partitionedTopics.stream().filter(t ->
+                                        !pulsar().getBrokerService().isSystemTopic(TopicName.get(t))).count();
                 if (topicsCount + numPartitions > maxTopicsPerNamespace) {
                     log.error("[{}] Failed to create partitioned topic {}, "
                             + "exceed maximum number of topics in namespace", clientAppId(), topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 310c0d9..11d7b39 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -58,7 +58,6 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -237,7 +236,7 @@ public abstract class NamespacesBase extends AdminResource {
             }
             boolean hasNonSystemTopic = false;
             for (String topic : topics) {
-                if (!SystemTopicClient.isSystemTopic(TopicName.get(topic))) {
+                if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
                     hasNonSystemTopic = true;
                     break;
                 }
@@ -1892,14 +1891,14 @@ public abstract class NamespacesBase extends AdminResource {
                 }
                 for (Topic topic : topicList) {
                     if (topic instanceof PersistentTopic
-                            && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
+                            && !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
                         futures.add(((PersistentTopic) topic).clearBacklog(subscription));
                     }
                 }
             } else {
                 for (Topic topic : topicList) {
                     if (topic instanceof PersistentTopic
-                            && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
+                            && !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
                         futures.add(((PersistentTopic) topic).clearBacklog());
                     }
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 620d35e..cc29202 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -540,7 +539,7 @@ public abstract class AbstractTopic implements Topic {
     }
 
     protected void setSchemaCompatibilityStrategy(Policies policies) {
-        if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
+        if (isSystemTopic()) {
             schemaCompatibilityStrategy =
                     brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
             return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1c45202..939a389 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -117,7 +117,7 @@ import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.validator.BindAddressValidator;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -129,6 +129,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.BindAddress;
 import org.apache.pulsar.common.configuration.FieldContext;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
@@ -2671,8 +2672,30 @@ public class BrokerService implements Closeable {
         log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName);
         return null;
     }
-    private boolean isSystemTopic(String topic) {
-        return SystemTopicClient.isSystemTopic(TopicName.get(topic));
+
+    public boolean isSystemTopic(String topic) {
+        return isSystemTopic(TopicName.get(topic));
+    }
+
+    public boolean isSystemTopic(TopicName topicName) {
+        if (topicName.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)
+                || topicName.getNamespaceObject().equals(pulsar.getHeartbeatNamespaceV2())) {
+            return true;
+        }
+
+        TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
+
+        // event topic
+        if (EventsTopicNames.checkTopicIsEventsNames(nonePartitionedTopicName)) {
+            return true;
+        }
+
+        String localName = nonePartitionedTopicName.getLocalName();
+        // transaction pending ack topic
+        if (StringUtils.endsWith(localName, MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
+            return true;
+        }
+        return false;
     }
 
     /**
@@ -2707,13 +2730,13 @@ public class BrokerService implements Closeable {
                     int maxTopicsPerNamespace = optPolicies.map(p -> p.max_topics_per_namespace)
                             .orElse(pulsar.getConfig().getMaxTopicsPerNamespace());
 
-                    if (maxTopicsPerNamespace > 0 && !SystemTopicClient.isSystemTopic(topicName)) {
+                    if (maxTopicsPerNamespace > 0 && !isSystemTopic(topicName)) {
                         return pulsar().getPulsarResources().getTopicResources()
                                 .getExistingPartitions(topicName)
                                 .thenCompose(topics -> {
                                     // exclude created system topic
                                     long topicsCount = topics.stream()
-                                            .filter(t -> !SystemTopicClient.isSystemTopic(TopicName.get(t)))
+                                            .filter(t -> !isSystemTopic(TopicName.get(t)))
                                             .count();
                                     if (topicsCount + numPartitions > maxTopicsPerNamespace) {
                                         log.error("Failed to create persistent topic {}, "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index 122bd9f..33bfc59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -21,13 +21,9 @@ package org.apache.pulsar.broker.systopic;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.broker.admin.impl.BrokersBase;
-import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 
 /**
@@ -188,25 +184,4 @@ public interface SystemTopicClient<T> {
          */
         SystemTopicClient<T> getSystemTopic();
     }
-
-    static boolean isSystemTopic(TopicName topicName) {
-        TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
-
-        // event topic
-        if (EventsTopicNames.checkTopicIsEventsNames(nonePartitionedTopicName)) {
-            return true;
-        }
-
-        String localName = nonePartitionedTopicName.getLocalName();
-        // transaction pending ack topic
-        if (StringUtils.endsWith(localName, MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
-            return true;
-        }
-        // health check topic
-        if (StringUtils.endsWith(localName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX)){
-            return true;
-        }
-        return false;
-    }
-
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index af8ddb5..c7ffef1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -66,6 +69,7 @@ import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
@@ -723,6 +727,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         } finally {
             pulsarClient.close();
         }
+        resetState();
     }
 
     @SuppressWarnings("deprecation")
@@ -1275,4 +1280,33 @@ public class BrokerServiceTest extends BrokerTestBase {
             getStatsThread.join();
         }
     }
+
+    @Test
+    public void testIsSystemTopic() {
+        BrokerService brokerService = pulsar.getBrokerService();
+        assertFalse(brokerService.isSystemTopic(TopicName.get("test")));
+        assertFalse(brokerService.isSystemTopic(TopicName.get("public/default/test")));
+        assertFalse(brokerService.isSystemTopic(TopicName.get("healthcheck")));
+        assertFalse(brokerService.isSystemTopic(TopicName.get("public/default/healthcheck")));
+        assertFalse(brokerService.isSystemTopic(TopicName.get("persistent://public/default/test")));
+        assertFalse(brokerService.isSystemTopic(TopicName.get("non-persistent://public/default/test")));
+
+        assertTrue(brokerService.isSystemTopic(TopicName.get("__change_events")));
+        assertTrue(brokerService.isSystemTopic(TopicName.get("__change_events-partition-0")));
+        assertTrue(brokerService.isSystemTopic(TopicName.get("__change_events-partition-1")));
+        assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot")));
+        assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-0")));
+        assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-1")));
+        assertTrue(brokerService.isSystemTopic(TopicName
+                .get("topicxxx-partition-0-multiTopicsReader-f433329d68__transaction_pending_ack")));
+        assertTrue(brokerService.isSystemTopic(
+                TopicName.get("topicxxx-multiTopicsReader-f433329d68__transaction_pending_ack")));
+
+        assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_ASSIGN));
+        assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_LOG));
+        NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfig());
+        NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfig());
+        assertTrue(brokerService.isSystemTopic("persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck"));
+        assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString() + "/healthcheck"));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 3b7d9aa..1a2dd42 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -35,11 +35,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.admin.impl.BrokersBase;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
@@ -580,49 +582,32 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
     }
 
     @Test(timeOut = 30000)
-    public void testInternalTopicInactiveNotClean() throws Exception {
+    public void testHealthTopicInactiveNotClean() throws Exception {
         conf.setSystemTopicEnabled(true);
         conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
         conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
         super.baseSetup();
         // init topic
-        final String healthCheckTopic = "persistent://prop/ns-abc/"+ BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX;
-        final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions";
-
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic(topic)
-                .create();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("sub")
-                .subscribe();
-
-        Producer<byte[]> heathCheckProducer = pulsarClient.newProducer()
-                .topic(healthCheckTopic)
-                .create();
-        Consumer<byte[]> heathCheckConsumer = pulsarClient.newConsumer()
-                .topic(healthCheckTopic)
-                .subscriptionName("healthCheck")
-                .subscribe();
-
-        consumer.close();
-        producer.close();
-        heathCheckConsumer.close();
-        heathCheckProducer.close();
-
-        Awaitility.await().untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
-                .contains(topic)));
-        Awaitility.await().untilAsserted(() -> {
-            Assert.assertTrue(admin.topics().getList("prop/ns-abc").contains(healthCheckTopic));
-        });
-
-        admin.topics().deleteSubscription(topic, "sub");
-        admin.topics().deleteSubscription(healthCheckTopic, "healthCheck");
-
-        Awaitility.await().untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc")
-                .contains(topic)));
-        Awaitility.await().pollDelay(2, TimeUnit.SECONDS)
-                .untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
-                        .contains(healthCheckTopic)));
+        NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfig());
+        final String healthCheckTopicV1 = "persistent://" + heartbeatNamespaceV1 + "/healthcheck";
+
+        NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfig());
+        final String healthCheckTopicV2 = "persistent://" + heartbeatNamespaceV2 + "/healthcheck";
+
+        admin.brokers().healthcheck(TopicVersion.V1);
+        admin.brokers().healthcheck(TopicVersion.V2);
+
+        List<String> V1Partitions = pulsar
+                .getPulsarResources()
+                .getTopicResources()
+                .getExistingPartitions(TopicName.get(healthCheckTopicV1))
+                .get(10, TimeUnit.SECONDS);
+        List<String> V2Partitions = pulsar
+                .getPulsarResources()
+                .getTopicResources()
+                .getExistingPartitions(TopicName.get(healthCheckTopicV2))
+                .get(10, TimeUnit.SECONDS);
+        Assert.assertTrue(V1Partitions.contains(healthCheckTopicV1));
+        Assert.assertTrue(V2Partitions.contains(healthCheckTopicV2));
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index 2daca67..d81c89d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.systopic;
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
@@ -143,9 +144,9 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
         admin.topics().createPartitionedTopic(normalTopic, 3);
         TopicName systemTopicName = TopicName.get(systemTopic);
         TopicName normalTopicName = TopicName.get(normalTopic);
-
-        Assert.assertEquals(SystemTopicClient.isSystemTopic(systemTopicName), true);
-        Assert.assertEquals(SystemTopicClient.isSystemTopic(normalTopicName), false);
+        BrokerService brokerService = pulsar.getBrokerService();
+        Assert.assertEquals(brokerService.isSystemTopic(systemTopicName), true);
+        Assert.assertEquals(brokerService.isSystemTopic(normalTopicName), false);
     }
 
     private void prepareData() throws PulsarAdminException {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/SystemTopicClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/SystemTopicClientTest.java
deleted file mode 100644
index d6790b7..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/SystemTopicClientTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.systopic;
-
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import org.apache.pulsar.common.naming.TopicName;
-import org.testng.annotations.Test;
-
-public class SystemTopicClientTest {
-
-    @Test
-    public void testIsSystemTopic() {
-        assertFalse(SystemTopicClient.isSystemTopic(TopicName.get("test")));
-        assertFalse(SystemTopicClient.isSystemTopic(TopicName.get("public/default/test")));
-        assertFalse(SystemTopicClient.isSystemTopic(TopicName.get("persistent://public/default/test")));
-        assertFalse(SystemTopicClient.isSystemTopic(TopicName.get("non-persistent://public/default/test")));
-
-        assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__change_events")));
-        assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__change_events-partition-0")));
-        assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__change_events-partition-1")));
-        assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__transaction_buffer_snapshot")));
-        assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-0")));
-        assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-1")));
-        assertTrue(SystemTopicClient.isSystemTopic(TopicName
-                .get("topicxxx-partition-0-multiTopicsReader-f433329d68__transaction_pending_ack")));
-        assertTrue(SystemTopicClient.isSystemTopic(
-                TopicName.get("topicxxx-multiTopicsReader-f433329d68__transaction_pending_ack")));
-
-    }
-}

[pulsar] 12/14: [fix][txn]: fix cannot enable transaction when is allow auto update schema disabled (#14809)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e397e1e4c4b36701b8b11b7164c4481a16f359fb
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Mar 24 15:49:01 2022 +0800

    [fix][txn]: fix cannot enable transaction when is allow auto update schema disabled (#14809)
    
    (cherry picked from commit b5b0967f12174ba35baaf25092ac521f281e6b7d)
---
 .../pulsar/broker/service/AbstractTopic.java       |  3 +++
 .../pulsar/broker/transaction/TransactionTest.java | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cc29202..caa0ea2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -356,6 +356,9 @@ public abstract class AbstractTopic implements Topic {
     }
 
     private boolean allowAutoUpdateSchema() {
+        if (brokerService.isSystemTopic(topic)) {
+            return true;
+        }
         if (isAllowAutoUpdateSchema == null) {
             return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 308fa35..6fd1e92 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
@@ -89,12 +90,14 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -850,4 +853,32 @@ public class TransactionTest extends TransactionTestBase {
         Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
 
     }
+
+    @Test
+    public void testAutoCreateSchemaForTransactionSnapshot() throws Exception {
+        String namespace = TENANT + "/ns2";
+        String topic = namespace + "/test";
+        pulsarServiceList.forEach((pulsarService ->
+                pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(false)));
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+        TopicName transactionBufferTopicName =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(
+                        TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
+        TopicName transactionBufferTopicName1 =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(
+                        TopicName.get(topic).getNamespaceObject(), EventType.TOPIC_POLICY);
+        Awaitility.await().untilAsserted(() -> {
+            SchemaInfo schemaInfo = admin
+                    .schemas()
+                    .getSchemaInfo(transactionBufferTopicName.toString());
+            Assert.assertNotNull(schemaInfo);
+            SchemaInfo schemaInfo1 = admin
+                    .schemas()
+                    .getSchemaInfo(transactionBufferTopicName1.toString());
+            Assert.assertNotNull(schemaInfo1);
+        });
+        pulsarServiceList.forEach((pulsarService ->
+                pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
+    }
 }
\ No newline at end of file

[pulsar] 07/14: [fix][admin] Fix NPE in PulsarAdminBuilder when the service is not set (#14769)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8a3eccbffaa29c5cca3667e1bc3273483f6adcf1
Author: Zike Yang <zk...@streamnative.io>
AuthorDate: Tue Mar 22 18:29:23 2022 +0800

    [fix][admin] Fix NPE in PulsarAdminBuilder when the service is not set (#14769)
    
    (cherry picked from commit 0b6b1e2079a9afe92527df2cdb0e843a0ae391dd)
---
 .../client/admin/internal/PulsarAdminImpl.java     |  3 ++
 .../admin/internal/PulsarAdminBuilderImplTest.java | 38 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index 80cd978..427ab6d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import java.io.IOException;
 import java.net.URL;
 import java.util.Map;
@@ -148,6 +149,8 @@ public class PulsarAdminImpl implements PulsarAdmin {
                        int autoCertRefreshTime,
                        TimeUnit autoCertRefreshTimeUnit,
                        ClassLoader clientBuilderClassLoader) throws PulsarClientException {
+        checkArgument(StringUtils.isNotBlank(serviceUrl), "Service URL needs to be specified");
+
         this.connectTimeout = connectTimeout;
         this.connectTimeoutUnit = connectTimeoutUnit;
         this.readTimeout = readTimeout;
diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
new file mode 100644
index 0000000..1ea45401
--- /dev/null
+++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testng.annotations.Test;
+
+public class PulsarAdminBuilderImplTest {
+
+    @Test
+    public void testAdminBuilderWithServiceUrlNotSet() throws PulsarClientException {
+        try{
+            PulsarAdmin.builder().build();
+            fail();
+        } catch (IllegalArgumentException exception) {
+            assertEquals("Service URL needs to be specified", exception.getMessage());
+        }
+    }
+}

[pulsar] 13/14: [improve][tool] Improve transaction perf logs (#14816)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b75ac4bdfb312dc6b028f5341615ac9de1e1d8ff
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Mar 24 23:07:19 2022 +0800

    [improve][tool] Improve transaction perf logs (#14816)
    
    (cherry picked from commit f74a58686865edd1a7a6dbba20423a80eaa04d4c)
---
 .../org/apache/pulsar/testclient/PerformanceConsumer.java    |  9 +++++++--
 .../org/apache/pulsar/testclient/PerformanceProducer.java    | 12 ++++++++----
 2 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 3c7bed9..77bef8c 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -439,6 +439,9 @@ public class PerformanceConsumer {
                     if (!arguments.isAbortTransaction) {
                         transaction.commit()
                                 .thenRun(() -> {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Commit transaction {}", transaction.getTxnID());
+                                    }
                                     totalEndTxnOpSuccessNum.increment();
                                     numTxnOpSuccess.increment();
                                 })
@@ -449,11 +452,13 @@ public class PerformanceConsumer {
                                 });
                     } else {
                         transaction.abort().thenRun(() -> {
-                            log.info("Abort transaction {}", transaction.getTxnID().toString());
+                            if (log.isDebugEnabled()) {
+                                log.debug("Abort transaction {}", transaction.getTxnID());
+                            }
                             totalEndTxnOpSuccessNum.increment();
                             numTxnOpSuccess.increment();
                         }).exceptionally(exception -> {
-                            log.error("Commit transaction {} failed with exception",
+                            log.error("Abort transaction {} failed with exception",
                                     transaction.getTxnID().toString(),
                                     exception);
                             totalEndTxnOpFailNum.increment();
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 0e3d550..c18cb6c 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -761,8 +761,10 @@ public class PerformanceProducer {
                         if (!arguments.isAbortTransaction) {
                             transaction.commit()
                                     .thenRun(() -> {
-                                        log.info("Committed transaction {}",
-                                                transaction.getTxnID().toString());
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("Committed transaction {}",
+                                                    transaction.getTxnID().toString());
+                                        }
                                         totalEndTxnOpSuccessNum.increment();
                                         numTxnOpSuccess.increment();
                                     })
@@ -774,11 +776,13 @@ public class PerformanceProducer {
                                     });
                         } else {
                             transaction.abort().thenRun(() -> {
-                                log.info("Abort transaction {}", transaction.getTxnID().toString());
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Abort transaction {}", transaction.getTxnID().toString());
+                                }
                                 totalEndTxnOpSuccessNum.increment();
                                 numTxnOpSuccess.increment();
                             }).exceptionally(exception -> {
-                                log.error("Commit transaction {} failed with exception",
+                                log.error("Abort transaction {} failed with exception",
                                         transaction.getTxnID().toString(),
                                         exception);
                                 totalEndTxnOpFailNum.increment();

[pulsar] 04/14: [fix][test] Fix wrong retry behavior in MetadataCacheTest (#14778)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 973eadd084b4d99a5098f02596bc762b947a2c00
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Tue Mar 22 00:13:54 2022 +0800

    [fix][test] Fix wrong retry behavior in MetadataCacheTest (#14778)
    
    (cherry picked from commit f89001743b43ade0d1c0fa0440e2d03676e9b3ed)
---
 .../apache/pulsar/metadata/MetadataCacheTest.java  | 32 ++++++++++++++++++++++
 1 file changed, 32 insertions(+)

diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index 70ed621..322c9bb 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -36,6 +36,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import lombok.AllArgsConstructor;
 import lombok.Cleanup;
@@ -555,4 +556,35 @@ public class MetadataCacheTest extends BaseMetadataStoreTest {
         assertEquals(res.getValue().b, 2);
         assertEquals(res.getValue().path, key1);
     }
+
+    public static void assertEqualsAndRetry(Supplier<Object> actual,
+                                            Object expected,
+                                            Object expectedAndRetry) throws Exception {
+        assertEqualsAndRetry(actual, expected, expectedAndRetry, 5, 100);
+    }
+
+    public static void assertEqualsAndRetry(Supplier<Object> actual,
+                                            Object expected,
+                                            Object expectedAndRetry,
+                                            int retryCount,
+                                            long intSleepTimeInMillis) throws Exception {
+        assertTrue(retryStrategically((__) -> {
+            if (actual.get().equals(expectedAndRetry)) {
+                return false;
+            }
+            assertEquals(actual.get(), expected);
+            return true;
+        }, retryCount, intSleepTimeInMillis));
+    }
+
+    public static boolean retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
+            throws Exception {
+        for (int i = 0; i < retryCount; i++) {
+            if (predicate.test(null)) {
+                return true;
+            }
+            Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+        }
+        return false;
+    }
 }

[pulsar] 10/14: [fix][txn]: fix transaction buffer recover reader and writer fail (#14801)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 60018ba26de64e35343b4f617c625659db312a3e
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Mar 24 14:10:20 2022 +0800

    [fix][txn]: fix transaction buffer recover reader and writer fail (#14801)
    
    ### Motivation
    When Transaction buffer recover create reader or create writer fail or read snapshot fail throw PulsarClientException, we should rerecover this topic so close this topic to reinit.
    
    (cherry picked from commit bf568633d843f0e6d66b844a72d9d2178171626a)
---
 .../TransactionBufferSystemTopicClient.java        |  22 ++-
 .../buffer/impl/TopicTransactionBuffer.java        | 184 ++++++++++++---------
 .../TopicTransactionBufferRecoverCallBack.java     |   2 +-
 .../TopicTransactionBufferRecoverTest.java         |  48 +++++-
 4 files changed, 164 insertions(+), 92 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
index 807bb9d..aaab858 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
@@ -128,10 +128,17 @@ public class TransactionBufferSystemTopicClient extends SystemTopicClientBase<Tr
 
         @Override
         public CompletableFuture<Void> closeAsync() {
-            return producer.closeAsync().thenCompose(v -> {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            producer.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the producer
                 transactionBufferSystemTopicClient.removeWriter(this);
-                return CompletableFuture.completedFuture(null);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
             });
+            return completableFuture;
         }
 
         @Override
@@ -179,10 +186,17 @@ public class TransactionBufferSystemTopicClient extends SystemTopicClientBase<Tr
 
         @Override
         public CompletableFuture<Void> closeAsync() {
-            return reader.closeAsync().thenCompose(v -> {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            reader.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the reader
                 transactionBufferSystemTopicClient.removeReader(this);
-                return CompletableFuture.completedFuture(null);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
             });
+            return completableFuture;
         }
 
         @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index f108a16..3d6e7da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -178,15 +179,24 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                     }
 
                     @Override
-                    public void recoverExceptionally(Exception e) {
-                        if (e instanceof PulsarClientException.BrokerMetadataException) {
+                    public void recoverExceptionally(Throwable e) {
+
+                        // when create reader or writer fail throw PulsarClientException,
+                        // should close this topic and then reinit this topic
+                        if (e instanceof PulsarClientException) {
+                            // if transaction buffer recover fail throw PulsarClientException,
+                            // we need to change the PulsarClientException to ServiceUnitNotReadyException,
+                            // the tc do op will retry
+                            transactionBufferFuture.completeExceptionally
+                                    (new BrokerServiceException.ServiceUnitNotReadyException(e.getMessage(), e));
                             log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the "
                                     + "transaction buffer throw exception", topic.getName(), e);
-                            topic.close();
+                        } else {
+                            transactionBufferFuture.completeExceptionally(e);
                         }
-                        transactionBufferFuture.completeExceptionally(e);
+                        topic.close(true);
                     }
-                }, this.topic, this));
+                }, this.topic, this, takeSnapshotWriter));
     }
 
     @Override
@@ -541,98 +551,112 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
         private final TopicTransactionBuffer topicTransactionBuffer;
 
+        private final CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter;
+
         private TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, PersistentTopic topic,
-                                              TopicTransactionBuffer transactionBuffer) {
+                                              TopicTransactionBuffer transactionBuffer, CompletableFuture<
+                SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter) {
             this.topic = topic;
             this.callBack = callBack;
             this.entryQueue = new SpscArrayQueue<>(2000);
             this.topicTransactionBuffer = transactionBuffer;
+            this.takeSnapshotWriter = takeSnapshotWriter;
         }
 
         @SneakyThrows
         @Override
         public void run() {
-            if (!this.topicTransactionBuffer.changeToInitializingState()) {
-                log.warn("TransactionBuffer {} of topic {} can not change state to Initializing",
-                        this, topic.getName());
-                return;
-            }
-            topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
-                    .createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
-                try {
-                    boolean hasSnapshot = false;
-                    while (reader.hasMoreEvents()) {
-                        Message<TransactionBufferSnapshot> message = reader.readNext();
-                        if (topic.getName().equals(message.getKey())) {
-                            TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
-                            if (transactionBufferSnapshot != null) {
-                                hasSnapshot = true;
-                                callBack.handleSnapshot(transactionBufferSnapshot);
-                                this.startReadCursorPosition = PositionImpl.get(
-                                        transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-                                        transactionBufferSnapshot.getMaxReadPositionEntryId());
-                            }
-                        }
-                    }
-                    if (!hasSnapshot) {
-                        callBack.noNeedToRecover();
-                        return;
-                    }
-                } catch (PulsarClientException pulsarClientException) {
-                    log.error("[{}]Transaction buffer recover fail when read "
-                            + "transactionBufferSnapshot!", topic.getName(), pulsarClientException);
-                    callBack.recoverExceptionally(pulsarClientException);
-                    reader.closeAsync().exceptionally(e -> {
-                        log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
-                        return null;
-                    });
+            this.takeSnapshotWriter.thenRunAsync(() -> {
+                if (!this.topicTransactionBuffer.changeToInitializingState()) {
+                    log.warn("TransactionBuffer {} of topic {} can not change state to Initializing",
+                            this, topic.getName());
                     return;
                 }
-                reader.closeAsync().exceptionally(e -> {
-                    log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
-                    return null;
-                });
-
-                ManagedCursor managedCursor;
-                try {
-                    managedCursor = topic.getManagedLedger()
-                            .newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME);
-                } catch (ManagedLedgerException e) {
-                    callBack.recoverExceptionally(e);
-                    log.error("[{}]Transaction buffer recover fail when open cursor!", topic.getName(), e);
-                    return;
-                }
-                PositionImpl lastConfirmedEntry = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
-                PositionImpl currentLoadPosition = (PositionImpl) this.startReadCursorPosition;
-                FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue,
-                        managedCursor, TopicTransactionBufferRecover.this);
-                if (lastConfirmedEntry.getEntryId() != -1) {
-                    while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
-                            && fillEntryQueueCallback.fillQueue()) {
-                        Entry entry = entryQueue.poll();
-                        if (entry != null) {
+                topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
+                        .createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
                             try {
-                                currentLoadPosition = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
-                                callBack.handleTxnEntry(entry);
-                            } finally {
-                                entry.release();
+                                boolean hasSnapshot = false;
+                                while (reader.hasMoreEvents()) {
+                                    Message<TransactionBufferSnapshot> message = reader.readNext();
+                                    if (topic.getName().equals(message.getKey())) {
+                                        TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
+                                        if (transactionBufferSnapshot != null) {
+                                            hasSnapshot = true;
+                                            callBack.handleSnapshot(transactionBufferSnapshot);
+                                            this.startReadCursorPosition = PositionImpl.get(
+                                                    transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                                    transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                        }
+                                    }
+                                }
+                                if (!hasSnapshot) {
+                                    callBack.noNeedToRecover();
+                                    return;
+                                }
+                            } catch (PulsarClientException pulsarClientException) {
+                                log.error("[{}]Transaction buffer recover fail when read "
+                                        + "transactionBufferSnapshot!", topic.getName(), pulsarClientException);
+                                callBack.recoverExceptionally(pulsarClientException);
+                                reader.closeAsync().exceptionally(e -> {
+                                    log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
+                                    return null;
+                                });
+                                return;
                             }
-                        } else {
+                            reader.closeAsync().exceptionally(e -> {
+                                log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
+                                return null;
+                            });
+
+                            ManagedCursor managedCursor;
                             try {
-                                Thread.sleep(1);
-                            } catch (InterruptedException e) {
-                                //no-op
+                                managedCursor = topic.getManagedLedger()
+                                        .newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME);
+                            } catch (ManagedLedgerException e) {
+                                callBack.recoverExceptionally(e);
+                                log.error("[{}]Transaction buffer recover fail when open cursor!", topic.getName(), e);
+                                return;
+                            }
+                            PositionImpl lastConfirmedEntry =
+                                    (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+                            PositionImpl currentLoadPosition = (PositionImpl) this.startReadCursorPosition;
+                            FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue,
+                                    managedCursor, TopicTransactionBufferRecover.this);
+                            if (lastConfirmedEntry.getEntryId() != -1) {
+                                while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
+                                        && fillEntryQueueCallback.fillQueue()) {
+                                    Entry entry = entryQueue.poll();
+                                    if (entry != null) {
+                                        try {
+                                            currentLoadPosition = PositionImpl.get(entry.getLedgerId(),
+                                                    entry.getEntryId());
+                                            callBack.handleTxnEntry(entry);
+                                        } finally {
+                                            entry.release();
+                                        }
+                                    } else {
+                                        try {
+                                            Thread.sleep(1);
+                                        } catch (InterruptedException e) {
+                                            //no-op
+                                        }
+                                    }
+                                }
                             }
-                        }
-                    }
-                }
 
-                closeCursor(managedCursor);
-                callBack.recoverComplete();
-            }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this))
-                    .exceptionally(e -> {
-                callBack.recoverExceptionally(new Exception(e));
-                log.error("[{}]Transaction buffer new snapshot reader fail!", topic.getName(), e);
+                            closeCursor(managedCursor);
+                            callBack.recoverComplete();
+                        }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                                .getExecutor(this)).exceptionally(e -> {
+                            callBack.recoverExceptionally(e.getCause());
+                            log.error("[{}]Transaction buffer new snapshot reader fail!", topic.getName(), e);
+                            return null;
+                        });
+            }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                    .getExecutor(this)).exceptionally(e -> {
+                callBack.recoverExceptionally(e.getCause());
+                log.error("[{}]Transaction buffer create snapshot writer fail!",
+                        topic.getName(), e);
                 return null;
             });
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
index 1640459..87b8e93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
@@ -51,5 +51,5 @@ public interface TopicTransactionBufferRecoverCallBack {
     /**
      * Topic transaction buffer recover exceptionally.
      */
-    void recoverExceptionally(Exception e);
+    void recoverExceptionally(Throwable e);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 5701d22..01e03a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
@@ -452,7 +453,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
 
     @Test(timeOut=30000)
-    public void testTransactionBufferRecoverThrowBrokerMetadataException() throws Exception {
+    public void testTransactionBufferRecoverThrowPulsarClientException() throws Exception {
         String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowBrokerMetadataException";
         @Cleanup
         Producer<byte[]> producer = pulsarClient
@@ -469,23 +470,55 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         producer.newMessage(txn).value("test".getBytes()).sendAsync();
         txn.commit().get();
 
-        // take snapshot
         PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
         TransactionBufferSnapshotService transactionBufferSnapshotService =
                 mock(TransactionBufferSnapshotService.class);
         SystemTopicClient.Reader<TransactionBufferSnapshot> reader = mock(SystemTopicClient.Reader.class);
-        // mock reader can't read snapshot fail
-        doThrow(new PulsarClientException.BrokerMetadataException("")).when(reader).hasMoreEvents();
-        doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
+        SystemTopicClient.Writer<TransactionBufferSnapshot> writer = mock(SystemTopicClient.Writer.class);
 
+        doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
+        doReturn(CompletableFuture.completedFuture(writer)).when(transactionBufferSnapshotService).createWriter(any());
         Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
         field.setAccessible(true);
         TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
                 (TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0));
+        // mock reader can't read snapshot fail
+        doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
+        // check reader close topic
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
+                transactionBufferSnapshotService, originalTopic, field, producer);
+        doReturn(true).when(reader).hasMoreEvents();
+
+        // mock create reader fail
+        doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
+                .when(transactionBufferSnapshotService).createReader(any());
+        // check create reader fail close topic
+        originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
+                transactionBufferSnapshotService, originalTopic, field, producer);
+        doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
+
+        // check create writer fail close topic
+        originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+        // mock create writer fail
+        doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
+                .when(transactionBufferSnapshotService).createWriter(any());
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
+                transactionBufferSnapshotService, originalTopic, field, producer);
+
+    }
+
+    private void checkCloseTopic(PulsarClient pulsarClient,
+                                 TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal,
+                                 TransactionBufferSnapshotService transactionBufferSnapshotService,
+                                 PersistentTopic originalTopic,
+                                 Field field, Producer<byte[]> producer) throws Exception {
         field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService);
 
-        // recover again will throw BrokerMetadataException then close topic
+        // recover again will throw then close topic
         new TopicTransactionBuffer(originalTopic);
         Awaitility.await().untilAsserted(() -> {
             // isFenced means closed
@@ -493,10 +526,11 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
             close.setAccessible(true);
             assertTrue((boolean) close.get(originalTopic));
         });
+
         field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal);
 
         // topic recover success
-        txn = pulsarClient.newTransaction()
+        Transaction txn = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
                 .build().get();
 

[pulsar] 03/14: Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e0eebaf1d00ea4a0cddd8464c3be26ef531387fa
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Fri Mar 18 12:05:26 2022 +0800

    Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706)
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    Fixes #14704
    
    ### Motivation
    
    When the user uses the function of DeadLetterPolicy, it is better than misoperation. MaxRedeliverCount may be set to 0. When it is set to 0, according to the current processing logic of the Java Client, the message will be pushed to the DeadLetter Topic every time.
    
    ### Modifications
    
    - When MaxRedeliverCount <= 0 in DeadLetterPolicy, we reset MaxRedeliverCount to default value
    
    (cherry picked from commit 601fbdd40eabfcd5d5519e0c5bcc20ae280a8e18)
---
 .../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java     |  2 ++
 .../org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java | 10 ++++++++++
 .../apache/pulsar/websocket/AbstractWebSocketHandlerTest.java  |  3 ++-
 3 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 471d4ba..dea946b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -428,6 +428,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
             if (conf.getAckTimeoutMillis() == 0) {
                 conf.setAckTimeoutMillis(DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER);
             }
+
+            checkArgument(deadLetterPolicy.getMaxRedeliverCount() > 0, "MaxRedeliverCount must be > 0.");
             conf.setDeadLetterPolicy(deadLetterPolicy);
         }
         return this;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 13d63ba..36ea53f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -288,6 +289,15 @@ public class ConsumerBuilderImplTest {
                 .build());
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testRedeliverCountOfDeadLetterPolicy() {
+        consumerBuilderImpl.deadLetterPolicy(DeadLetterPolicy.builder()
+                .maxRedeliverCount(0)
+                .deadLetterTopic("test-dead-letter-topic")
+                .retryLetterTopic("test-retry-letter-topic")
+                .build());
+    }
+
     @Test
     public void testConsumerBuilderImplWhenNumericPropertiesAreValid() {
         consumerBuilderImpl.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS);
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index 9bd9907..782e05e 100644
--- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -369,11 +369,12 @@ public class AbstractWebSocketHandlerTest {
         consumerHandler.clearQueryParams();
         consumerHandler.putQueryParam("receiverQueueSize", "1001");
         consumerHandler.putQueryParam("deadLetterTopic", "dead-letter-topic");
+        consumerHandler.putQueryParam("maxRedeliverCount", "3");
 
         conf = consumerHandler.getConf();
         // receive queue size is the minimum value of default value (1000) and user defined value(1001)
         assertEquals(conf.getReceiverQueueSize(), 1000);
         assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(), "dead-letter-topic");
-        assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 0);
+        assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 3);
     }
 }

[pulsar] 11/14: [fix][txn]: fix some exception handle in transaction buffer (#14808)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 511644b3894da3ba4e984a60b11ffd0f032df729
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Mar 24 15:46:14 2022 +0800

    [fix][txn]: fix some exception handle in transaction buffer (#14808)
    
    (cherry picked from commit 4824912615bf85f836a72051117462471b0ab306)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java      | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 3d6e7da..7ed656f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -227,8 +227,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                     completableFuture.complete(null);
                 }
             }).exceptionally(exception -> {
-                log.error("Topic {}: TransactionBuffer recover failed", this.topic.getName(), exception);
-                completableFuture.completeExceptionally(exception);
+                log.error("Topic {}: TransactionBuffer recover failed", this.topic.getName(), exception.getCause());
+                completableFuture.completeExceptionally(exception.getCause());
                 return null;
             });
             return completableFuture;
@@ -305,8 +305,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                 commitMarker.release();
             }
         }).exceptionally(exception -> {
-            log.error("Transaction {} commit on topic {}.", txnID.toString(), topic.getName(), exception);
-            completableFuture.completeExceptionally(exception);
+            log.error("Transaction {} commit on topic {}.", txnID.toString(), topic.getName(), exception.getCause());
+            completableFuture.completeExceptionally(exception.getCause());
             return null;
         });
         return completableFuture;
@@ -351,8 +351,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                 abortMarker.release();
             }
         }).exceptionally(exception -> {
-            log.error("Transaction {} abort on topic {}.", txnID.toString(), topic.getName());
-            completableFuture.completeExceptionally(exception);
+            log.error("Transaction {} abort on topic {}.", txnID.toString(), topic.getName(), exception.getCause());
+            completableFuture.completeExceptionally(exception.getCause());
             return null;
         });
         return completableFuture;