You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 09:31:23 UTC

[pulsar] branch branch-2.8 updated (a94dbe6 -> fdcf5a4)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from a94dbe6  [pulsar-admin] Perfect judgment conditions of pulsar-admin (#12315)
     new 97c79e2  Enable CLI to publish non-batched messages (#12641)
     new 74c91a3  [Authorization] Support GET_METADATA topic op after enable auth (#12656)
     new 5e2657a  [Config] Add readWorkerThreadsThrottlingEnabled to conf/bookkeeper.conf (#12666)
     new 985ef03  [pulsar-admin] Print topic internal info as formatted json (#12709)
     new 3c9894d  [Managed Ledger] Fix the incorrect total size when BrokerEntryMetadata is enabled (#12714)
     new 0d2022d  Even if always compatible is set, Consumers cannot be created (#12721)
     new 2ab4dec  [Authorization] Support UNSUBSCRIBE namespace op after enable auth (#12742)
     new 230e1ac  [Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster (#12729)
     new d09854f  [Java Client] Let producer reconnect for state RegisteringSchema (#12781)
     new c43d1da  [Pulsar SQL] Handle message null schema version in PulsarRecordCursor (#12809)
     new 0016151  Fix TopicPoliciesCacheNotInitException issue. (#12773)
     new fdcf5a4  Fix cherry-pick issue

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/bookkeeper.conf                               |   5 +
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |   1 +
 .../authorization/PulsarAuthorizationProvider.java |   2 +
 .../SystemTopicBasedTopicPoliciesService.java      |  42 +++++---
 .../broker/service/persistent/PersistentTopic.java |   6 +-
 .../service/schema/SchemaRegistryServiceImpl.java  |   3 +
 .../broker/service/BrokerEntryMetadataE2ETest.java |  47 +++++++++
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  40 ++++++++
 .../api/AuthorizationProducerConsumerTest.java     |  42 +++++++-
 .../SchemaCompatibilityCheckTest.java              |  22 +++++
 .../pulsar/client/cli/PulsarClientToolTest.java    |  46 ++++++++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |   5 +-
 .../org/apache/pulsar/client/cli/CmdProduce.java   |   5 +
 .../pulsar/client/impl/ConnectionHandler.java      |   1 +
 .../util/collections/ConcurrentOpenHashMap.java    |  26 +++++
 .../collections/ConcurrentOpenHashMapTest.java     |  32 ++++++
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  42 ++++++--
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    |   9 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 107 ++++++++++++++++++++-
 .../coordinator/impl/MLTransactionLogImpl.java     |   8 +-
 site2/docs/reference-cli-tools.md                  |   1 +
 21 files changed, 456 insertions(+), 36 deletions(-)

[pulsar] 09/12: [Java Client] Let producer reconnect for state RegisteringSchema (#12781)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d09854f868eb28067da8ed62b14df7f013b8ba32
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Mon Nov 15 01:39:45 2021 -0600

    [Java Client] Let producer reconnect for state RegisteringSchema (#12781)
    
    Motivation
    In the Java Client, if a producer is in the RegisteringSchema state, it is a valid state for it to reconnect. Fix the ConnectionHandler to align with this behavior.
    
    Modifications
    Update the isValidStateForReconnection method to return true for state RegisteringSchema.
    Verifying this change
    This change is a trivial rework / code cleanup without any test coverage.
    
    Does this pull request potentially affect one of the following parts:
    This update does not contain breaking changes.
    
    (cherry picked from commit 8d75eedbf6a37c1e77f6c5b6eac080ae00b77768)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 8fb7ab4..13de530 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -152,6 +152,7 @@ public class ConnectionHandler {
         switch (state) {
             case Uninitialized:
             case Connecting:
+            case RegisteringSchema:
             case Ready:
                 // Ok
                 return true;

[pulsar] 02/12: [Authorization] Support GET_METADATA topic op after enable auth (#12656)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 74c91a30224cc25c20054e1f679b3e2667238891
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Nov 8 15:21:43 2021 +0800

    [Authorization] Support GET_METADATA topic op after enable auth (#12656)
    
    ### Motivation
    Currently, we can get the internal stats of a topic through `bin/pulsar-admin topics stats-internal tn1/ns1/tp1` and also get ledger metadata by specifying flag `--metadata`.
    
    However I found that `PulsarAuthorizationProvider` lacks support for topic operation `GET_METADATA` when verifying the role's authorization, code as below:
    https://github.com/apache/pulsar/blob/08a49c06bff4a52d26319a114961aed6cb6c4791/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L1162-L1164
    https://github.com/apache/pulsar/blob/08a49c06bff4a52d26319a114961aed6cb6c4791/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L567-L596
    
    The purpose of this PR is to support role with `lookup` topic authorization to `GET_METADATA` of ledger.
    
    (cherry picked from commit e4767355bc7fa0a2e6bbb09acf1f93e4cd7cbe0b)
---
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../auth/admin/GetMetadataOfTopicWithAuthTest.java | 213 +++++++++++++++++++++
 2 files changed, 214 insertions(+)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index e339987..06291ee 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -582,6 +582,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
         switch (operation) {
             case LOOKUP:
             case GET_STATS:
+            case GET_METADATA:
                 isAuthorizedFuture = canLookupAsync(topicName, role, authData);
                 break;
             case PRODUCE:
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
new file mode 100644
index 0000000..7578629
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.elasticsearch.common.collect.Set;
+import org.testcontainers.containers.Network;
+import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * GetMetadataOfTopicWithAuthTest will test Getmetadata operation with and without the proper permission.
+ */
+@Slf4j
+public class GetMetadataOfTopicWithAuthTest extends TestRetrySupport {
+
+    private static final String CLUSTER_PREFIX = "get-metadata-auth";
+    private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
+    private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";
+
+    private static final String SUPER_USER_ROLE = "super-user";
+    private String superUserAuthToken;
+    private static final String PROXY_ROLE = "proxy";
+    private String proxyAuthToken;
+    private static final String REGULAR_USER_ROLE = "client";
+    private String clientAuthToken;
+    private File publicKeyFile;
+
+    private PulsarCluster pulsarCluster;
+    private PulsarContainer cmdContainer;
+
+    @Override
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        incrementSetupNumber();
+        // Before starting the cluster, generate the secret key and the token
+        // Use Zk container to have 1 container available before starting the cluster
+        final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6));
+        final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6));
+        cmdContainer = new ZKContainer<>(cliContainerName);
+        cmdContainer
+                .withNetwork(Network.newNetwork())
+                .withNetworkAliases(ZKContainer.NAME)
+                .withEnv("zkServers", ZKContainer.NAME);
+        cmdContainer.start();
+
+        createKeysAndTokens(cmdContainer);
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(2)
+                .numProxies(1)
+                .clusterName(clusterName)
+                .brokerEnvs(getBrokerSettingsEnvs())
+                .proxyEnvs(getProxySettingsEnvs())
+                .brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+                .proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+                .build();
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        markCurrentSetupNumberCleaned();
+        if (cmdContainer != null) {
+            cmdContainer.stop();
+        }
+        if (pulsarCluster != null) {
+            pulsarCluster.stop();
+        }
+    }
+
+    private Map<String, String> getBrokerSettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE));
+        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("authenticateOriginalAuthData", "true");
+        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    private Map<String, String> getProxySettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("forwardAuthorizationCredentials", "true");
+        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    protected void createKeysAndTokens(PulsarContainer container) throws Exception {
+        container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
+                        "--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+
+        byte[] publicKeyBytes = DockerUtils
+                .runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
+                        "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
+                .getStdout();
+
+        publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
+        Files.write(publicKeyBytes, publicKeyFile);
+
+        clientAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", REGULAR_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created client token: {}", clientAuthToken);
+
+        superUserAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", SUPER_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created super-user token: {}", superUserAuthToken);
+
+        proxyAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", PROXY_ROLE)
+                .getStdout().trim();
+        log.info("Created proxy token: {}", proxyAuthToken);
+    }
+
+    @Test
+    public void testGetMetadataOfTopicWithLookupPermission() throws Exception {
+        @Cleanup
+        PulsarAdmin superUserAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication(AuthenticationFactory.token(superUserAuthToken))
+                .build();
+
+        @Cleanup
+        PulsarAdmin clientAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication(AuthenticationFactory.token(clientAuthToken))
+                .build();
+
+        // create partitioned topic
+        superUserAdmin.topics().createPartitionedTopic("public/default/test", 1);
+
+        // do some operation without grant any permissions
+        try {
+            clientAdmin.topics().getInternalStats("public/default/test-partition-0", true);
+            fail("get internal stats and metadata operation should fail because the client hasn't permission to do");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 401);
+        }
+
+        // grant consume/produce permission to the role
+        superUserAdmin.topics().grantPermission("public/default/test",
+                REGULAR_USER_ROLE, Set.of(AuthAction.consume));
+
+        // then do some get internal stats and metadata operations again, it should success
+        PersistentTopicInternalStats internalStats = clientAdmin.topics()
+                .getInternalStats("public/default/test-partition-0", true);
+        assertNotNull(internalStats);
+    }
+}

[pulsar] 04/12: [pulsar-admin] Print topic internal info as formatted json (#12709)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 985ef03c4df6c680874835130121e9e33ba1beba
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Wed Nov 10 16:24:49 2021 +0800

    [pulsar-admin] Print topic internal info as formatted json (#12709)
    
    (cherry picked from commit bcc8243c97a99173c9907500f25031fc905cc802)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java         | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index cdd91b1..3d522ff 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -28,6 +28,8 @@ import com.beust.jcommander.converters.CommaParameterSplitter;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -587,7 +589,8 @@ public class CmdTopics extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String topic = validateTopicName(params);
-            String result = getTopics().getInternalInfo(topic);
+            String internalInfo = getTopics().getInternalInfo(topic);
+            JsonObject result = JsonParser.parseString(internalInfo).getAsJsonObject();
             Gson gson = new GsonBuilder().setPrettyPrinting().create();
             System.out.println(gson.toJson(result));
         }

[pulsar] 07/12: [Authorization] Support UNSUBSCRIBE namespace op after enable auth (#12742)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2ab4decdce95c2aa1b9d233608027b35c7c69da8
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Fri Nov 12 10:29:56 2021 +0800

    [Authorization] Support UNSUBSCRIBE namespace op after enable auth (#12742)
    
    ### Motivation
    Currently, we can `unsubscribe` the given subscription on all topics on a namespace through `bin/pulsar-admin namespaces unsubscribe -s sub tn1/ns1`. However, role(not super-user or administrator) with `consume` auth action for namespace cannot perform `unsubscribe` operation when enable auth.
    
    The root of the problem is that `PulsarAuthorizationProvider` lacks support for namespace operation `UNSUBSCRIBE` when verifying the role's authorization, code as below:
    https://github.com/apache/pulsar/blob/8cae63557a318240e95697f382b4f61c22b70d64/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L1667-L1669
    https://github.com/apache/pulsar/blob/8cae63557a318240e95697f382b4f61c22b70d64/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L522-L536
    
    The purpose of this PR is to support that role with `consume` namespace authorization could `unsubscribe` subscriptions on a namespace.
    
    (cherry picked from commit 8926631db9c9a78341726b53ca119ad4c69720e8)
---
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../api/AuthorizationProducerConsumerTest.java     |  42 +++-
 .../auth/admin/GetMetadataOfTopicWithAuthTest.java | 213 ---------------------
 3 files changed, 42 insertions(+), 214 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 06291ee..411c253 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -546,6 +546,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
                 isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages);
                 break;
             case GET_TOPICS:
+            case UNSUBSCRIBE:
                 isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData);
                 break;
             default:
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index f94f6b7..1af36f5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.api;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Maps;
@@ -27,6 +29,7 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -51,6 +54,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TenantOperation;
@@ -174,6 +178,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         final String tenantRole = "tenant-role";
         final String subscriptionRole = "sub1-role";
         final String subscriptionName = "sub1";
+        final String subscriptionName2 = "sub2";
         final String namespace = "my-property/my-ns-sub-auth";
         final String topicName = "persistent://" + namespace + "/my-topic";
         Authentication adminAuthentication = new ClientAuthentication("superUser");
@@ -201,7 +206,18 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         superAdmin.tenants().createTenant("my-property",
                 new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
         superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
-        tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
+
+        // subscriptionRole doesn't have topic-level authorization, so it will fail to get topic stats-internal info
+        try {
+            sub1Admin.topics().getInternalStats(topicName, true);
+            fail("should have failed with authorization exception");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith(
+                    "Unauthorized to validateTopicOperation for operation [GET_STATS]"));
+        }
+
+        // grant topic consume authorization to the subscriptionRole
+        tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
                 Collections.singleton(AuthAction.consume));
 
         replacePulsarClient(PulsarClient.builder()
@@ -211,7 +227,17 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         // (1) Create subscription name
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .subscribe();
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName2)
+                .subscribe();
         consumer.close();
+        consumer2.close();
+
+        List<String> subscriptions = sub1Admin.topics().getSubscriptions(topicName);
+        assertEquals(subscriptions.size(), 2);
+
+        // now, subscriptionRole have consume authorization on topic, so it will successfully get topic internal stats
+        PersistentTopicInternalStats internalStats = superAdmin.topics().getInternalStats(topicName, true);
+        assertNotNull(internalStats);
 
         // verify tenant is able to perform all subscription-admin api
         tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
@@ -227,10 +253,24 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);
 
+        // subscriptionRole doesn't have namespace-level authorization, so it will fail to unsubscribe namespace
+        try {
+            sub1Admin.namespaces().unsubscribeNamespace(namespace, subscriptionName2);
+            fail("should have failed with authorization exception");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith(
+                    "Unauthorized to validateNamespaceOperation for operation [UNSUBSCRIBE]"));
+        }
+
         // grant namespace-level authorization to the subscriptionRole
         tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
                 Collections.singleton(AuthAction.consume));
 
+        // now, subscriptionRole have consume authorization on namespace, so it will successfully unsubscribe namespace
+        superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2);
+        subscriptions = sub1Admin.topics().getSubscriptions(topicName);
+        assertEquals(subscriptions.size(), 1);
+
         // subscriptionRole has namespace-level authorization
         sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
deleted file mode 100644
index 7578629..0000000
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.auth.admin;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
-import com.google.common.io.Files;
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.tests.TestRetrySupport;
-import org.apache.pulsar.tests.integration.containers.PulsarContainer;
-import org.apache.pulsar.tests.integration.containers.ZKContainer;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.apache.pulsar.tests.integration.utils.DockerUtils;
-import org.elasticsearch.common.collect.Set;
-import org.testcontainers.containers.Network;
-import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * GetMetadataOfTopicWithAuthTest will test Getmetadata operation with and without the proper permission.
- */
-@Slf4j
-public class GetMetadataOfTopicWithAuthTest extends TestRetrySupport {
-
-    private static final String CLUSTER_PREFIX = "get-metadata-auth";
-    private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
-    private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";
-
-    private static final String SUPER_USER_ROLE = "super-user";
-    private String superUserAuthToken;
-    private static final String PROXY_ROLE = "proxy";
-    private String proxyAuthToken;
-    private static final String REGULAR_USER_ROLE = "client";
-    private String clientAuthToken;
-    private File publicKeyFile;
-
-    private PulsarCluster pulsarCluster;
-    private PulsarContainer cmdContainer;
-
-    @Override
-    @BeforeClass(alwaysRun = true)
-    protected void setup() throws Exception {
-        incrementSetupNumber();
-        // Before starting the cluster, generate the secret key and the token
-        // Use Zk container to have 1 container available before starting the cluster
-        final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6));
-        final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6));
-        cmdContainer = new ZKContainer<>(cliContainerName);
-        cmdContainer
-                .withNetwork(Network.newNetwork())
-                .withNetworkAliases(ZKContainer.NAME)
-                .withEnv("zkServers", ZKContainer.NAME);
-        cmdContainer.start();
-
-        createKeysAndTokens(cmdContainer);
-
-        PulsarClusterSpec spec = PulsarClusterSpec.builder()
-                .numBookies(2)
-                .numBrokers(2)
-                .numProxies(1)
-                .clusterName(clusterName)
-                .brokerEnvs(getBrokerSettingsEnvs())
-                .proxyEnvs(getProxySettingsEnvs())
-                .brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
-                .proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
-                .build();
-
-        pulsarCluster = PulsarCluster.forSpec(spec);
-        pulsarCluster.start();
-    }
-
-    @Override
-    @AfterClass(alwaysRun = true)
-    public void cleanup() {
-        markCurrentSetupNumberCleaned();
-        if (cmdContainer != null) {
-            cmdContainer.stop();
-        }
-        if (pulsarCluster != null) {
-            pulsarCluster.stop();
-        }
-    }
-
-    private Map<String, String> getBrokerSettingsEnvs() {
-        Map<String, String> envs = new HashMap<>();
-        envs.put("authenticationEnabled", "true");
-        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
-        envs.put("authorizationEnabled", "true");
-        envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE));
-        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
-        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken));
-        envs.put("authenticationRefreshCheckSeconds", "1");
-        envs.put("authenticateOriginalAuthData", "true");
-        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
-        return envs;
-    }
-
-    private Map<String, String> getProxySettingsEnvs() {
-        Map<String, String> envs = new HashMap<>();
-        envs.put("authenticationEnabled", "true");
-        envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
-        envs.put("authorizationEnabled", "true");
-        envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
-        envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken));
-        envs.put("authenticationRefreshCheckSeconds", "1");
-        envs.put("forwardAuthorizationCredentials", "true");
-        envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
-        return envs;
-    }
-
-    protected void createKeysAndTokens(PulsarContainer container) throws Exception {
-        container
-                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
-                        "--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
-                        "--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);
-
-        byte[] publicKeyBytes = DockerUtils
-                .runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
-                        "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
-                .getStdout();
-
-        publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
-        Files.write(publicKeyBytes, publicKeyFile);
-
-        clientAuthToken = container
-                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
-                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
-                        "--subject", REGULAR_USER_ROLE)
-                .getStdout().trim();
-        log.info("Created client token: {}", clientAuthToken);
-
-        superUserAuthToken = container
-                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
-                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
-                        "--subject", SUPER_USER_ROLE)
-                .getStdout().trim();
-        log.info("Created super-user token: {}", superUserAuthToken);
-
-        proxyAuthToken = container
-                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
-                        "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
-                        "--subject", PROXY_ROLE)
-                .getStdout().trim();
-        log.info("Created proxy token: {}", proxyAuthToken);
-    }
-
-    @Test
-    public void testGetMetadataOfTopicWithLookupPermission() throws Exception {
-        @Cleanup
-        PulsarAdmin superUserAdmin = PulsarAdmin.builder()
-                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
-                .authentication(AuthenticationFactory.token(superUserAuthToken))
-                .build();
-
-        @Cleanup
-        PulsarAdmin clientAdmin = PulsarAdmin.builder()
-                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
-                .authentication(AuthenticationFactory.token(clientAuthToken))
-                .build();
-
-        // create partitioned topic
-        superUserAdmin.topics().createPartitionedTopic("public/default/test", 1);
-
-        // do some operation without grant any permissions
-        try {
-            clientAdmin.topics().getInternalStats("public/default/test-partition-0", true);
-            fail("get internal stats and metadata operation should fail because the client hasn't permission to do");
-        } catch (PulsarAdminException e) {
-            assertEquals(e.getStatusCode(), 401);
-        }
-
-        // grant consume/produce permission to the role
-        superUserAdmin.topics().grantPermission("public/default/test",
-                REGULAR_USER_ROLE, Set.of(AuthAction.consume));
-
-        // then do some get internal stats and metadata operations again, it should success
-        PersistentTopicInternalStats internalStats = clientAdmin.topics()
-                .getInternalStats("public/default/test-partition-0", true);
-        assertNotNull(internalStats);
-    }
-}

[pulsar] 06/12: Even if always compatible is set, Consumers cannot be created (#12721)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0d2022dea5db7fa9057a769dcaacfd904c337b43
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Fri Nov 12 13:43:50 2021 +0800

    Even if always compatible is set, Consumers cannot be created (#12721)
    
    (cherry picked from commit c3da1452a444c9599cb85562a3faa82ddfdecec8)
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  3 +++
 .../SchemaCompatibilityCheckTest.java              | 22 ++++++++++++++++++++++
 2 files changed, 25 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 0eff36b..e1c9b13 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -270,6 +270,9 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     @Override
     public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, SchemaData schemaData,
                                                               SchemaCompatibilityStrategy strategy) {
+        if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) {
+            return CompletableFuture.completedFuture(null);
+        }
         return getSchema(schemaId).thenCompose(existingSchema -> {
             if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                 if (strategy == SchemaCompatibilityStrategy.BACKWARD
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 02913c6..293f71d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -395,6 +395,28 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
 
     }
 
+    @Test
+    public void testAutoProduceSchemaAlwaysCompatible() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String topic = "topic" + randomName(16);
+
+        String namespace = "test-namespace-" + randomName(16);
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(), tenant, namespace, topic).toString();
+        NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+        admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet(CLUSTER_NAME));
+
+        // set ALWAYS_COMPATIBLE
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
+        // should not fail
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).subscriptionName("my-sub").topic(topicName).subscribe();
+
+        producer.close();
+        consumer.close();
+    }
+
     @Test(dataProvider =  "CanReadLastSchemaCompatibilityStrategy")
     public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
         final String tenant = PUBLIC_TENANT;

[pulsar] 11/12: Fix TopicPoliciesCacheNotInitException issue. (#12773)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 00161516c25fa5646cd9007d5232c80e5c058b51
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Nov 18 13:44:42 2021 +0800

    Fix TopicPoliciesCacheNotInitException issue. (#12773)
    
    Sometimes, we may get `TopicPoliciesCacheNotInitException` with below stack trace:
    ```
    15:45:47.020 [pulsar-web-41-3] INFO  org.eclipse.jetty.server.RequestLog - 10.0.0.42 - - [10/Nov/2021:15:45:47 +0000] "GET /status.html HTTP/1.1" 200 2 "-" "kube-probe/1.19+" 1
    15:45:51.221 [pulsar-2-15] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to perform getRetention on topic persistent://public/default/UpdateNodeCharts
    java.lang.RuntimeException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
    	at org.apache.pulsar.broker.service.TopicPoliciesService.lambda$getTopicPoliciesAsyncWithRetry$0(TopicPoliciesService.java:84) ~[io.streamnative-pulsar-broker-2.8.1.21.jar:2.8.1.21]
    	at org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:50) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
    	at org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$1(RetryUtil.java:63) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final]
    	at java.lang.Thread.run(Thread.java:829) [?:?]
    ```
    
    This is because : https://github.com/apache/pulsar/blob/c3da1452a444c9599cb85562a3faa82ddfdecec8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L298-L312
    
    when `reader.readNextAsync()` throws exceptions, the msg will be null which will throw NPE without any catch block.
    
    (cherry picked from commit 11298144ac118cda951deffa092ab17110d254b7)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 42 ++++++++++++++--------
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 40 +++++++++++++++++++++
 .../coordinator/impl/MLTransactionLogImpl.java     |  8 +++--
 3 files changed, 74 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 292a35c..bea4e8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -171,6 +171,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     @Override
     public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
+        if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
+            NamespaceName namespace = topicName.getNamespaceObject();
+            prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+        }
         if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
                 && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
             throw new TopicPoliciesCacheNotInitException();
@@ -208,24 +212,29 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 result.complete(null);
             } else {
                 ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
-                policyCacheInitMap.put(namespace, false);
-                CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
-                        creatSystemTopicClientWithRetry(namespace);
-                readerCaches.put(namespace, readerCompletableFuture);
-                readerCompletableFuture.whenComplete((reader, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                        result.completeExceptionally(ex);
-                    } else {
-                        initPolicesCache(reader, result);
-                        result.thenRun(() -> readMorePolicies(reader));
-                    }
-                });
+                prepareInitPoliciesCache(namespace, result);
             }
         }
         return result;
     }
 
+    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
+            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
+                    creatSystemTopicClientWithRetry(namespace);
+            readerCaches.put(namespace, readerCompletableFuture);
+            readerCompletableFuture.whenComplete((reader, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+                    result.completeExceptionally(ex);
+                } else {
+                    initPolicesCache(reader, result);
+                    result.thenRun(() -> readMorePolicies(reader));
+                }
+            });
+        }
+    }
+
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry(
             NamespaceName namespace) {
         SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
@@ -292,6 +301,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
                 readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                reader.closeAsync();
+                return;
             }
             if (hasMore) {
                 reader.readNextAsync().whenComplete((msg, e) -> {
@@ -300,6 +312,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                                 reader.getSystemTopic().getTopicName(), ex);
                         future.completeExceptionally(e);
                         readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                        policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                        reader.closeAsync();
+                        return;
                     }
                     refreshTopicPoliciesCache(msg);
                     if (log.isDebugEnabled()) {
@@ -314,7 +329,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 }
                 policyCacheInitMap.computeIfPresent(
                         reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
-
                 // replay policy message
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 49d8699..80f3dc9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -32,8 +32,12 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
@@ -279,4 +283,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
 
         assertEquals(reader1, reader);
     }
+
+    @Test
+    public void testGetTopicPoliciesWithRetry() throws Exception {
+        Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
+        initMapField.setAccessible(true);
+        Map<NamespaceName, Boolean> initMap = (Map)initMapField.get(systemTopicBasedTopicPoliciesService);
+        initMap.remove(NamespaceName.get(NAMESPACE1));
+        Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches");
+        readerCaches.setAccessible(true);
+        Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers = (Map)readerCaches.get(systemTopicBasedTopicPoliciesService);
+        readers.remove(NamespaceName.get(NAMESPACE1));
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(500, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
+                .setMax(1000, TimeUnit.MILLISECONDS)
+                .create();
+        TopicPolicies initPolicy = TopicPolicies.builder()
+                .maxConsumerPerTopic(10)
+                .build();
+        ScheduledExecutorService executors = Executors.newScheduledThreadPool(1);
+        executors.schedule(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
+                } catch (Exception ignore) {}
+            }
+        }, 2000, TimeUnit.MILLISECONDS);
+        Awaitility.await().untilAsserted(() -> {
+            Optional<TopicPolicies> topicPolicies = systemTopicBasedTopicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor()).get();
+            Assert.assertTrue(topicPolicies.isPresent());
+            if (topicPolicies.isPresent()) {
+                Assert.assertEquals(topicPolicies.get(), initPolicy);
+            }
+        });
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index f2324af..c044275 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -73,8 +73,7 @@ public class MLTransactionLogImpl implements TransactionLog {
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
                                 ManagedLedgerConfig managedLedgerConfig) {
-        this.topicName = TopicName.get(TopicDomain.persistent.value(),
-                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId());
+        this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
         this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
         managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
@@ -83,6 +82,11 @@ public class MLTransactionLogImpl implements TransactionLog {
         this.entryQueue = new SpscArrayQueue<>(2000);
     }
 
+    public static TopicName getMLTransactionLogName(TransactionCoordinatorID tcID) {
+        return TopicName.get(TopicDomain.persistent.value(),
+                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId());
+    }
+
     @Override
     public CompletableFuture<Void> initialize() {
         CompletableFuture<Void> future = new CompletableFuture<>();

[pulsar] 08/12: [Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster (#12729)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 230e1ac9f97e292a01766766705d531f3eb8bf34
Author: JiangHaiting <ja...@qq.com>
AuthorDate: Mon Nov 15 08:52:31 2021 +0800

    [Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster (#12729)
    
    See #12723
    
    Add a method org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap#removeNullValue to remove null value   in a thread safe way.
    
    (cherry picked from commit a3fe00efc4ccba55a0f28fd02b535c6624e3ed0a)
---
 .../broker/service/persistent/PersistentTopic.java |  6 ++--
 .../util/collections/ConcurrentOpenHashMap.java    | 26 ++++++++++++++++++
 .../collections/ConcurrentOpenHashMapTest.java     | 32 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cdc0e00..3e7d733 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1544,7 +1544,7 @@ public class PersistentTopic extends AbstractTopic
 
     protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
         AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
-        replicators.computeIfAbsent(remoteCluster, r -> {
+        Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
             try {
                 return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
                         brokerService);
@@ -1555,8 +1555,8 @@ public class PersistentTopic extends AbstractTopic
             return null;
         });
         // clean up replicator if startup is failed
-        if (!isReplicatorStarted.get()) {
-            replicators.remove(remoteCluster);
+        if (replicator == null) {
+            replicators.removeNullValue(remoteCluster);
         }
         return isReplicatorStarted.get();
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 47927a9..2c7eed1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -42,6 +43,27 @@ public class ConcurrentOpenHashMap<K, V> {
     private static final Object EmptyKey = null;
     private static final Object DeletedKey = new Object();
 
+    /**
+     * This object is used to delete empty value in this map.
+     * EmptyValue.equals(null) = true.
+     */
+    private static final Object EmptyValue = new Object() {
+
+        @SuppressFBWarnings
+        @Override
+        public boolean equals(Object obj) {
+            return obj == null;
+        }
+
+        /**
+         * This is just for avoiding spotbugs errors
+         */
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    };
+
     private static final float MapFillFactor = 0.66f;
 
     private static final int DefaultExpectedItems = 256;
@@ -142,6 +164,10 @@ public class ConcurrentOpenHashMap<K, V> {
         return getSection(h).remove(key, value, (int) h) != null;
     }
 
+    public void removeNullValue(K key) {
+        remove(key, EmptyValue);
+    }
+
     private Section<K, V> getSection(long hash) {
         // Use 32 msb out of long to get the section
         final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index e18012c..254be51 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -369,6 +370,37 @@ public class ConcurrentOpenHashMapTest {
         assertNull(map.get(t1_b));
     }
 
+    @Test
+    public void testNullValue() {
+        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1);
+        String key = "a";
+        assertThrows(NullPointerException.class, () -> map.put(key, null));
+
+        //put a null value.
+        assertNull(map.computeIfAbsent(key, k -> null));
+        assertEquals(1, map.size());
+        assertEquals(1, map.keys().size());
+        assertEquals(1, map.values().size());
+        assertNull(map.get(key));
+        assertFalse(map.containsKey(key));
+
+        //test remove null value
+        map.removeNullValue(key);
+        assertTrue(map.isEmpty());
+        assertEquals(0, map.keys().size());
+        assertEquals(0, map.values().size());
+        assertNull(map.get(key));
+        assertFalse(map.containsKey(key));
+
+
+        //test not remove non-null value
+        map.put(key, "V");
+        assertEquals(1, map.size());
+        map.removeNullValue(key);
+        assertEquals(1, map.size());
+
+    }
+
     static final int Iterations = 1;
     static final int ReadIterations = 1000;
     static final int N = 1_000_000;

[pulsar] 10/12: [Pulsar SQL] Handle message null schema version in PulsarRecordCursor (#12809)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c43d1da5cc28377a331403a5ccb7dc34f59fe13f
Author: ran <ga...@126.com>
AuthorDate: Tue Nov 16 13:16:58 2021 +0800

    [Pulsar SQL] Handle message null schema version in PulsarRecordCursor (#12809)
    
    ### Motivation
    
    Currently, if the schema version of the message is null, the Pulsar SQL will encounter an NPE problem.
    
    ### Modifications
    
    Adjust logic for null schema version in `PulsarRecordCursor`.
    
    1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES schema.
    2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER schema.
    3. If the schema version of the message is null, use the latest schema of the topic.
    4. If the schema version of the message is not null, get the specific version schema by PulsarAdmin.
    5. If the final schema is null throw a runtime exception.
    
    (cherry picked from commit e5619cffce702d9f446c27e69927148e45797b28)
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  42 ++++++--
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    |   9 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 107 ++++++++++++++++++++-
 3 files changed, 146 insertions(+), 12 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index f1e2bdb..b1230d3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -479,14 +480,7 @@ public class PulsarRecordCursor implements RecordCursor {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
-        try {
-            if (schemaInfo == null) {
-                schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+        SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit);
 
         Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
 
@@ -600,6 +594,38 @@ public class PulsarRecordCursor implements RecordCursor {
         return true;
     }
 
+    /**
+     * Get the schemaInfo of the message.
+     *
+     * 1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES schema.
+     * 2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER schema.
+     * 3. If the schema version of the message is null, use the schema info of pulsarSplit.
+     * 4. If the schema version of the message is not null, get the specific version schema by PulsarAdmin.
+     * 5. If the final schema is null throw a runtime exception.
+     */
+    private SchemaInfo getSchemaInfo(PulsarSplit pulsarSplit) {
+        SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
+        if (schemaInfo != null) {
+            return schemaInfo;
+        }
+        try {
+            if (this.currentMessage.getSchemaVersion() == null) {
+                schemaInfo = pulsarSplit.getSchemaInfo();
+            } else {
+                schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        if (schemaInfo == null) {
+            String schemaVersion = this.currentMessage.getSchemaVersion() == null
+                    ? "null" : BytesSchemaVersion.of(this.currentMessage.getSchemaVersion()).toString();
+            throw new RuntimeException("The specific version (" + schemaVersion + ") schema of the table "
+                    + pulsarSplit.getTableName() + " is null");
+        }
+        return schemaInfo;
+    }
+
     private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String schemaName) {
         if (!schemaType.equals(SchemaType.BYTES) && !schemaType.equals(SchemaType.NONE)) {
             return null;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index 3a9233c..828ceef 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -102,8 +102,13 @@ public class PulsarSqlSchemaInfoProvider implements SchemaInfoProvider {
         ClassLoader originalContextLoader = Thread.currentThread().getContextClassLoader();
         try {
             Thread.currentThread().setContextClassLoader(InjectionManagerFactory.class.getClassLoader());
-            return pulsarAdmin.schemas()
-                    .getSchemaInfo(topicName.toString(), ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
+            long version = ByteBuffer.wrap(bytesSchemaVersion.get()).getLong();
+            SchemaInfo schemaInfo = pulsarAdmin.schemas().getSchemaInfo(topicName.toString(), version);
+            if (schemaInfo == null) {
+                throw new RuntimeException(
+                        "The specific version (" + version + ") schema of the topic " + topicName + " is null");
+            }
+            return schemaInfo;
         } finally {
             Thread.currentThread().setContextClassLoader(originalContextLoader);
         }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 8d52183..60d3fed 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -34,18 +34,31 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -56,6 +69,8 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -65,6 +80,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class TestPulsarRecordCursor extends TestPulsarConnector {
 
@@ -323,9 +339,14 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
 
                                     MessageMetadata messageMetadata =
                                             new MessageMetadata()
-                                                    .setProducerName("test-producer").setSequenceId(positions.get(topic))
+                                                    .setProducerName("test-producer")
+                                                    .setSequenceId(positions.get(topic))
                                                     .setPublishTime(System.currentTimeMillis());
 
+                                    if (i % 2 == 0) {
+                                        messageMetadata.setSchemaVersion(new LongSchemaVersion(1L).bytes());
+                                    }
+
                                     if (KeyValueEncodingType.SEPARATED.equals(schema.getKeyValueEncodingType())) {
                                         messageMetadata
                                                 .setPartitionKey(new String(schema
@@ -380,7 +401,7 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
         PulsarSplit split = new PulsarSplit(0, pulsarConnectorId.toString(),
                 topicName.getNamespace(), topicName.getLocalName(), topicName.getLocalName(),
                 entriesNum,
-                new String(schema.getSchemaInfo().getSchema()),
+                new String(schema.getSchemaInfo().getSchema(),  "ISO8859-1"),
                 schema.getSchemaInfo().getType(),
                 0, entriesNum,
                 0, 0, TupleDomain.all(),
@@ -416,4 +437,86 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
         private Double field3;
     }
 
+    @Test
+    public void testGetSchemaInfo() throws Exception {
+        String topic = "get-schema-test";
+        PulsarSplit pulsarSplit = Mockito.mock(PulsarSplit.class);
+        Mockito.when(pulsarSplit.getTableName()).thenReturn(TopicName.get(topic).getLocalName());
+        Mockito.when(pulsarSplit.getSchemaName()).thenReturn("public/default");
+        PulsarAdmin pulsarAdmin = Mockito.mock(PulsarAdmin.class);
+        Schemas schemas = Mockito.mock(Schemas.class);
+        Mockito.when(pulsarAdmin.schemas()).thenReturn(schemas);
+        PulsarConnectorConfig connectorConfig = spy(new PulsarConnectorConfig());
+        Mockito.when(connectorConfig.getPulsarAdmin()).thenReturn(pulsarAdmin);
+        PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+                new ArrayList<>(), pulsarSplit, connectorConfig, Mockito.mock(ManagedLedgerFactory.class),
+                new ManagedLedgerConfig(), null, null));
+
+        Class<PulsarRecordCursor> clazz =  PulsarRecordCursor.class;
+        Method getSchemaInfo = clazz.getDeclaredMethod("getSchemaInfo", PulsarSplit.class);
+        getSchemaInfo.setAccessible(true);
+        Field currentMessage = clazz.getDeclaredField("currentMessage");
+        currentMessage.setAccessible(true);
+        RawMessage rawMessage = Mockito.mock(RawMessage.class);
+        currentMessage.set(pulsarRecordCursor, rawMessage);
+
+        // If the schemaType of pulsarSplit is NONE or BYTES, using bytes schema
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.NONE);
+        SchemaInfo schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.BYTES);
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        Mockito.when(pulsarSplit.getSchemaName()).thenReturn(Schema.BYTEBUFFER.getSchemaInfo().getName());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        // If the schemaVersion of the message is not null, try to get the schema.
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.AVRO);
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(0).bytes());
+        Mockito.when(schemas.getSchemaInfo(anyString(), eq(0L)))
+                .thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.AVRO, schemaInfo.getType());
+
+        String schemaTopic = "persistent://public/default/" + topic;
+
+        // If the schemaVersion of the message is null and the schema of pulsarSplit is null, throw runtime exception.
+        Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(null);
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+        try {
+            schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+            fail("The message schema version is null and the latest schema is null, should fail.");
+        } catch (InvocationTargetException e) {
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("schema of the table " + topic + " is null"));
+        }
+
+        // If the schemaVersion of the message is null, try to get the latest schema.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+        Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+
+        // If the specific version schema is null, throw runtime exception.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(1L).bytes());
+        Mockito.when(schemas.getSchemaInfo(schemaTopic, 1)).thenReturn(null);
+        try {
+            schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+            fail("The specific version " + 1 + " schema is null, should fail.");
+        } catch (InvocationTargetException e) {
+            String schemaVersion = BytesSchemaVersion.of(new LongSchemaVersion(1L).bytes()).toString();
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("schema of the topic " + schemaTopic + " is null"));
+        }
+
+        // Get the specific version schema.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(2L).bytes());
+        Mockito.when(schemas.getSchemaInfo(schemaTopic, 2)).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+    }
+
 }

[pulsar] 05/12: [Managed Ledger] Fix the incorrect total size when BrokerEntryMetadata is enabled (#12714)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c9894d53c0198b5738597ddc2c9db2356c950c9
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Nov 11 17:46:55 2021 +0800

    [Managed Ledger] Fix the incorrect total size when BrokerEntryMetadata is enabled (#12714)
    
    ### Motivation
    
    When the BrokerEntryMetadata is enabled, the total size in `ManagedLedgerImpl` is inaccurate. Because when the total size is updated in `OpAddEntry#safeRun`, the `dataLength` is the initial size of `data` when `OpAddEntry` is constructed, but `data` could be changed via `setData` method.
    
    The inaccurate total size could affect the retention size validation. Because in `ManagedLedgerImpl#internalTrimLedgers`, the total size reduces by the size of `LedgerInfo`, which is assigned from the `LedgerHandle#getLength()`. Therefore, the total size will become 0 or less before all ledgers are removed.
    
    ### Modifications
    
    - Update `dataLength` field in `setData` method.
    - Add a `testManagedLedgerTotalSize` test to `BrokerEntryMetadataE2ETest`. It produces 10 messages and trigger the rollover manually so that the first `LedgerInfo` of the managed ledger contains the correct total bytes. Then compare the `totalSize` field with it to verify this fix works.
    
    (cherry picked from commit 5dbb7d25849f3a037aa522b5d0767801aa0a5096)
---
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  1 +
 .../broker/service/BrokerEntryMetadataE2ETest.java | 45 ++++++++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 9106b4f..ecae17e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -314,6 +314,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
     }
 
     public void setData(ByteBuf data) {
+        this.dataLength = data.readableBytes();
         this.data = data;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 52c8375..92784bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -18,17 +18,25 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+
+import java.time.Duration;
 import java.util.List;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.assertj.core.util.Sets;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -214,4 +222,41 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
                 .subscribe();
         consumer.getLastMessageId();
     }
+
+    @Test
+    public void testManagedLedgerTotalSize() throws Exception {
+        final String topic = newTopicName();
+        final int messages = 10;
+
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.lookups().lookupTopic(topic);
+        final ManagedLedgerImpl managedLedger = pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .map(topicObject -> (ManagedLedgerImpl) ((PersistentTopic) topicObject).getManagedLedger())
+                .orElse(null);
+        Assert.assertNotNull(managedLedger);
+        final ManagedCursor cursor = managedLedger.openCursor("cursor"); // prevent ledgers being removed
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < messages; i++) {
+            producer.send("msg-" + i);
+        }
+
+        Assert.assertTrue(managedLedger.getTotalSize() > 0);
+
+        managedLedger.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        managedLedger.getConfig().setMaxEntriesPerLedger(1);
+        managedLedger.rollCurrentLedgerIfFull();
+
+        Awaitility.await().atMost(Duration.ofSeconds(3))
+                .until(() -> managedLedger.getLedgersInfo().size() > 1);
+
+        final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
+        Assert.assertEquals(ledgerInfoList.size(), 2);
+        Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
+
+        cursor.close();
+    }
 }

[pulsar] 01/12: Enable CLI to publish non-batched messages (#12641)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 97c79e20be5e291d6de03daecd2507d96e0b27f9
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Mon Nov 8 20:19:02 2021 +0900

    Enable CLI to publish non-batched messages (#12641)
    
    ### Motivation
    
    Currently, messages produced by the `pulsar-client` command are always batched. However, zero queue consumers cannot receive these batched messages. I think it would be useful to be able to easily produce non-batched messages.
    
    ### Modifications
    
    Added an option to disable batching to the `pulsar-client` command:
    ```sh
    $ ./bin/pulsar-client produce -m hello -n 10 --disable-batching persistent://public/default/t1
    ```
    
    (cherry picked from commit a1bad71728d0b39e5b38fdd4ea9a7578629ac975)
---
 .../pulsar/client/cli/PulsarClientToolTest.java    | 46 ++++++++++++++++++++--
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  5 +++
 site2/docs/reference-cli-tools.md                  |  1 +
 3 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index fcc6810..52dcf96 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -32,6 +32,9 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -114,7 +117,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("serviceUrl", brokerUrl.toString());
         properties.setProperty("useTls", "false");
 
-        final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+        final String topicName = getTopicWithRandomSuffix("non-durable");
 
         int numberOfMessages = 10;
         @Cleanup("shutdownNow")
@@ -169,7 +172,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("serviceUrl", brokerUrl.toString());
         properties.setProperty("useTls", "false");
 
-        final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+        final String topicName = getTopicWithRandomSuffix("durable");
 
         int numberOfMessages = 10;
         @Cleanup("shutdownNow")
@@ -219,7 +222,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("serviceUrl", brokerUrl.toString());
         properties.setProperty("useTls", "false");
 
-        final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+        final String topicName = getTopicWithRandomSuffix("encryption");
         final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
         final int numberOfMessages = 10;
 
@@ -262,4 +265,41 @@ public class PulsarClientToolTest extends BrokerTestBase {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void testDisableBatching() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("useTls", "false");
+
+        final String topicName = getTopicWithRandomSuffix("disable-batching");
+        final int numberOfMessages = 5;
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
+
+        PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
+        String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName};
+        Assert.assertEquals(pulsarClientTool1.run(args1), 0);
+
+        PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
+        String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName};
+        Assert.assertEquals(pulsarClientTool2.run(args2), 0);
+
+        for (int i = 0; i < numberOfMessages * 2; i++) {
+            Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+            Assert.assertNotNull(msg);
+            if (i < numberOfMessages) {
+                Assert.assertEquals(new String(msg.getData()), "batched");
+                Assert.assertTrue(msg.getMessageId() instanceof BatchMessageIdImpl);
+            } else {
+                Assert.assertEquals(new String(msg.getData()), "non-batched");
+                Assert.assertFalse(msg.getMessageId() instanceof BatchMessageIdImpl);
+            }
+        }
+    }
+
+    private static String getTopicWithRandomSuffix(String localNameBase) {
+        return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString());
+    }
+
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 326e221..e1d6aca 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -98,6 +98,9 @@ public class CmdProduce {
                description = "Rate (in msg/sec) at which to produce," +
                        " value 0 means to produce messages as fast as possible.")
     private double publishRate = 0;
+
+    @Parameter(names = { "-db", "--disable-batching" }, description = "Disable batch sending of messages")
+    private boolean disableBatching = false;
     
     @Parameter(names = { "-c",
             "--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
@@ -216,6 +219,8 @@ public class CmdProduce {
             if (this.chunkingAllowed) {
                 producerBuilder.enableChunking(true);
                 producerBuilder.enableBatching(false);
+            } else if (this.disableBatching) {
+                producerBuilder.enableBatching(false);
             }
             if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyValue)) {
                 producerBuilder.addEncryptionKey(this.encKeyName);
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index 04b323a..6ef73df 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -316,6 +316,7 @@ Options
 |`-m`, `--messages`|Comma-separated string of messages to send; either -m or -f must be specified|[]|
 |`-n`, `--num-produce`|The number of times to send the message(s); the count of messages/files * num-produce should be below 1000|1|
 |`-r`, `--rate`|Rate (in messages per second) at which to produce; a value 0 means to produce messages as fast as possible|0.0|
+|`-db`, `--disable-batching`|Disable batch sending of messages|false|
 |`-c`, `--chunking`|Split the message and publish in chunks if the message size is larger than the allowed max size|false|
 |`-s`, `--separator`|Character to split messages string with.|","|
 |`-k`, `--key`|Message key to add|key=value string, like k1=v1,k2=v2.|

[pulsar] 03/12: [Config] Add readWorkerThreadsThrottlingEnabled to conf/bookkeeper.conf (#12666)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5e2657a8d68d9614579f533eb3c179edb612baf9
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Nov 10 06:03:06 2021 +0200

    [Config] Add readWorkerThreadsThrottlingEnabled to conf/bookkeeper.conf (#12666)
    
    - https://github.com/apache/bookkeeper/pull/2646 added "Auto-throttle read operations" which is
      enabled by default
    
    (cherry picked from commit fc6d6dadaf77766189d0731196646d4c79874c8c)
---
 conf/bookkeeper.conf | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/conf/bookkeeper.conf b/conf/bookkeeper.conf
index 22a9f53..ac93efc 100644
--- a/conf/bookkeeper.conf
+++ b/conf/bookkeeper.conf
@@ -166,6 +166,11 @@ maxPendingReadRequestsPerThread=2500
 # avoid the executor queue to grow indefinitely
 maxPendingAddRequestsPerThread=10000
 
+# Use auto-throttling of the read-worker threads. This is done
+# to ensure the bookie is not using unlimited amount of memory
+# to respond to read-requests.
+readWorkerThreadsThrottlingEnabled=true
+
 # Option to enable busy-wait settings. Default is false.
 # WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during
 # context switches. The spinning will consume 100% CPU even when bookie is not doing any work. It is recommended to

[pulsar] 12/12: Fix cherry-pick issue

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fdcf5a4da7a21863e19068c90e8000b6e8fd74a0
Author: penghui <pe...@apache.org>
AuthorDate: Thu Nov 18 17:29:46 2021 +0800

    Fix cherry-pick issue
---
 .../org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java    | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 92784bf..b87353d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -22,6 +22,8 @@ import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInf
 
 import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;