You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/26 13:26:44 UTC

[pulsar] branch branch-2.7 updated (d5ef20d -> adba3f3)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from d5ef20d  Fix cherry-pick compile issue for branch-2.7
     new d3c0715  Make failPendingMessages called from within the ProducerImpl object mutex (#10528)
     new 749c683  Fix partitioned system topic check bug (#10529)
     new f20e513  Reduce function name size to 53 for k8s runtime (#10531)
     new 59931bd  Enable AutoTopicCreationType partitioned by proxy (#8048)
     new b272df4  [Issue 10161] Fix missing LoggerFactoryPtr type. (#10164)
     new adba3f3  [Performance] Use single instance of parser (#10664)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../AuthenticationProviderToken.java               |   6 +-
 .../AuthenticationProviderTokenTest.java           |   6 +-
 .../pulsar/broker/systopic/SystemTopicClient.java  |   5 +
 .../NamespaceEventsSystemTopicServiceTest.java     |  14 +++
 pulsar-client-cpp/lib/Log4CxxLogger.h              |   5 +-
 pulsar-client-cpp/lib/Log4cxxLogger.cc             |   8 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  13 ++-
 .../runtime/kubernetes/KubernetesRuntime.java      |   2 +-
 .../pulsar/proxy/server/LookupProxyHandler.java    | 104 ++++++++++-----------
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |  28 +++++-
 10 files changed, 118 insertions(+), 73 deletions(-)

[pulsar] 03/06: Reduce function name size to 53 for k8s runtime (#10531)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f20e513c910e80561f3eb2d29887ddc51566138c
Author: Chris Bartholomew <c_...@yahoo.com>
AuthorDate: Fri May 14 21:57:02 2021 -0400

    Reduce function name size to 53 for k8s runtime (#10531)
    
    
    (cherry picked from commit 74c0c3157a4df06e91ce2ccc5bdd315c89db328a)
---
 .../apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index ed36a66..dc50835 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -107,7 +107,7 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
 public class KubernetesRuntime implements Runtime {
 
     private static final String ENV_SHARD_ID = "SHARD_ID";
-    private static final int maxJobNameSize = 55;
+    private static final int maxJobNameSize = 53;
     private static final int maxLabelSize = 63;
     public static final Pattern VALID_POD_NAME_REGEX =
             Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",

[pulsar] 04/06: Enable AutoTopicCreationType partitioned by proxy (#8048)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 59931bdca4ebb8dc011e9eeb6c9bc4e6240bd34f
Author: Rudy Steiner <ru...@163.com>
AuthorDate: Wed May 19 23:51:10 2021 +0800

    Enable AutoTopicCreationType partitioned by proxy (#8048)
    
    
    (cherry picked from commit c24df3355f4312b2eb4a62f0f0497367fac1dadc)
---
 .../pulsar/proxy/server/LookupProxyHandler.java    | 104 ++++++++++-----------
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |  28 +++++-
 2 files changed, 74 insertions(+), 58 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index f2d7242..82300a3 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -216,65 +216,56 @@ public class LookupProxyHandler {
         }
     }
 
+    /**
+     *   Always get partition metadata from broker service.
+     *
+     *
+     **/
     private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata,
             long clientRequestId) {
         TopicName topicName = TopicName.get(partitionMetadata.getTopic());
-        if (isBlank(brokerServiceURL)) {
-            service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName,
-                    proxyConnection.clientAuthRole, proxyConnection.authenticationData).thenAccept(metadata -> {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Total number of partitions for topic {} is {}",
-                                    proxyConnection.clientAuthRole, topicName, metadata.partitions);
-                        }
-                        proxyConnection.ctx().writeAndFlush(
-                                Commands.newPartitionMetadataResponse(metadata.partitions, clientRequestId));
-                    }).exceptionally(ex -> {
-                        log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, topicName,
-                                ex.getMessage(), ex);
-                        proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(
-                          getServerError(ex), ex.getMessage(), clientRequestId));
-                        return null;
-                    });
-        } else {
-            URI brokerURI;
-            try {
-                brokerURI = new URI(brokerServiceURL);
-            } catch (URISyntaxException e) {
-                proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
-                        e.getMessage(), clientRequestId));
+        URI brokerURI;
+        try {
+            String availableBrokerServiceURL = getBrokerServiceUrl(clientRequestId);
+            if (availableBrokerServiceURL == null) {
+                log.warn("No available broker for {} to lookup partition metadata", topicName);
                 return;
             }
-            InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
-
-            if (log.isDebugEnabled()) {
-                log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
-                        topicName.getPartitionedTopicName(), clientRequestId);
-            }
+            brokerURI = new URI(availableBrokerServiceURL);
+        } catch (URISyntaxException e) {
+            proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                    e.getMessage(), clientRequestId));
+            return;
+        }
+        InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
 
-            proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
-                // Connected to backend broker
-                long requestId = proxyConnection.newRequestId();
-                ByteBuf command;
-                command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
-                clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
-                    if (t != null) {
-                        log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
-                            t.getMessage(), t);
-                        proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
-                            t.getMessage(), clientRequestId));
-                    } else {
-                        proxyConnection.ctx().writeAndFlush(
-                            Commands.newPartitionMetadataResponse(r.partitions, clientRequestId));
-                    }
-                    proxyConnection.getConnectionPool().releaseConnection(clientCnx);
-                });
-            }).exceptionally(ex -> {
-                // Failed to connect to backend broker
-                proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
-                        ex.getMessage(), clientRequestId));
-                return null;
-            });
+        if (log.isDebugEnabled()) {
+            log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
+                    topicName.getPartitionedTopicName(), clientRequestId);
         }
+        proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
+            // Connected to backend broker
+            long requestId = proxyConnection.newRequestId();
+            ByteBuf command;
+            command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
+            clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
+                        t.getMessage(), t);
+                    proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
+                        t.getMessage(), clientRequestId));
+                } else {
+                    proxyConnection.ctx().writeAndFlush(
+                        Commands.newPartitionMetadataResponse(r.partitions, clientRequestId));
+                }
+                proxyConnection.getConnectionPool().releaseConnection(clientCnx);
+            });
+        }).exceptionally(ex -> {
+            // Failed to connect to backend broker
+            proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+                    ex.getMessage(), clientRequestId));
+            return null;
+        });
     }
 
     public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
@@ -302,7 +293,7 @@ public class LookupProxyHandler {
 
     private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace,
                                             long clientRequestId) {
-        String serviceUrl = getServiceUrl(clientRequestId);
+        String serviceUrl = getBrokerServiceUrl(clientRequestId);
 
         if(!StringUtils.isNotBlank(serviceUrl)) {
             return;
@@ -364,7 +355,7 @@ public class LookupProxyHandler {
         }
 
         final long clientRequestId = commandGetSchema.getRequestId();
-        String serviceUrl = getServiceUrl(clientRequestId);
+        String serviceUrl = getBrokerServiceUrl(clientRequestId);
         String topic = commandGetSchema.getTopic();
 
         if(!StringUtils.isNotBlank(serviceUrl)) {
@@ -411,7 +402,10 @@ public class LookupProxyHandler {
 
     }
 
-    private String getServiceUrl(long clientRequestId) {
+    /**
+     *  Get default broker service url or discovery an available broker
+     **/
+    private String getBrokerServiceUrl(long clientRequestId) {
         if (isBlank(brokerServiceURL)) {
             ServiceLookupData availableBroker;
             try {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 187d56a..8106ef3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -23,14 +23,13 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
-
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -188,6 +187,29 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    /**
+     * test auto create partitioned topic by proxy
+     **/
+    @Test
+    public void testAutoCreateTopic() throws Exception{
+        int defaultPartition=2;
+        int defaultNumPartitions=pulsar.getConfiguration().getDefaultNumPartitions();
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(defaultPartition);
+        try {
+            @Cleanup
+            PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+              .build();
+            String topic = "persistent://sample/test/local/partitioned-proxy-topic";
+            CompletableFuture<List<String>> partitionNamesFuture = client.getPartitionsForTopic(topic);
+            List<String> partitionNames = partitionNamesFuture.get(30000, TimeUnit.MILLISECONDS);
+            Assert.assertEquals(partitionNames.size(), defaultPartition);
+        }finally {
+            pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+            pulsar.getConfiguration().setDefaultNumPartitions(defaultNumPartitions);
+        }
+    }
+
     @Test
     public void testRegexSubscription() throws Exception {
         @Cleanup

[pulsar] 06/06: [Performance] Use single instance of parser (#10664)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit adba3f37ae00f675723ce5d9a28e5737ca654d2a
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Fri May 21 03:59:37 2021 -0600

    [Performance] Use single instance of parser (#10664)
    
    Fixes #10652
    
    This is a minor change that optimizes the AuthProviderToken class to use
    the same instance of the parser instead of many instance.
    
    This minor change is covered by existing tests, with a small improvement
    to not use a deprecated method
    
    (cherry picked from commit a6bb98332224ca7ab3b5e9e76f60980a19d67b48)
---
 .../pulsar/broker/authentication/AuthenticationProviderToken.java   | 6 +++++-
 .../broker/authentication/AuthenticationProviderTokenTest.java      | 6 ++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index 5071298..80b9978 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -30,6 +30,7 @@ import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
 import io.jsonwebtoken.ExpiredJwtException;
+import io.jsonwebtoken.JwtParser;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Histogram;
 import org.apache.commons.lang3.StringUtils;
@@ -88,6 +89,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
     private SignatureAlgorithm publicKeyAlg;
     private String audienceClaim;
     private String audience;
+    private JwtParser parser;
 
     // config keys
     private String confTokenSecretKeySettingName;
@@ -122,6 +124,8 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
         this.audienceClaim = getTokenAudienceClaim(config);
         this.audience = getTokenAudience(config);
 
+        this.parser = Jwts.parserBuilder().setSigningKey(this.validationKey).build();
+
         if (audienceClaim != null && audience == null ) {
             throw new IllegalArgumentException("Token Audience Claim [" + audienceClaim
                                                + "] configured, but Audience stands for this broker not.");
@@ -186,7 +190,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
     @SuppressWarnings("unchecked")
     private Jwt<?, Claims> authenticateToken(final String token) throws AuthenticationException {
         try {
-            Jwt<?, Claims> jwt = Jwts.parserBuilder().setSigningKey(validationKey).build().parseClaimsJws(token);
+            Jwt<?, Claims> jwt = parser.parseClaimsJws(token);
 
             if (audienceClaim != null) {
                 Object object = jwt.getBody().get(audienceClaim);
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index fe4d6a7..5a2e1c4 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -86,8 +86,9 @@ public class AuthenticationProviderTokenTest {
                 .compact();
 
         @SuppressWarnings("unchecked")
-        Jwt<?, Claims> jwt = Jwts.parser()
+        Jwt<?, Claims> jwt = Jwts.parserBuilder()
                 .setSigningKey(AuthTokenUtils.decodeSecretKey(secretKey.getEncoded()))
+                .build()
                 .parse(token);
 
         assertNotNull(jwt);
@@ -107,8 +108,9 @@ public class AuthenticationProviderTokenTest {
                 Optional.empty());
 
         @SuppressWarnings("unchecked")
-        Jwt<?, Claims> jwt = Jwts.parser()
+        Jwt<?, Claims> jwt = Jwts.parserBuilder()
                 .setSigningKey(AuthTokenUtils.decodePublicKey(Decoders.BASE64.decode(publicKey), SignatureAlgorithm.RS256))
+                .build()
                 .parse(token);
 
         assertNotNull(jwt);

[pulsar] 02/06: Fix partitioned system topic check bug (#10529)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 749c6832f8d225718640515f66fec7346ff6f033
Author: hangc0276 <ch...@apache.org>
AuthorDate: Sat May 15 09:58:19 2021 +0800

    Fix partitioned system topic check bug (#10529)
    
    ### Motivation
    When checking a partitioned topic whether a system topic, it will always be `false`. The check logic is.
    ```Java
    static boolean isSystemTopic(TopicName topicName) {
            return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName());
        }
    ```
    ```Java
    public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events";
    ```
    The partitioned topic name is like `__change_events-partition-0`.
    
    ### Modification
    1. Trim the partition suffix for the topic local name if exists.
    2. Add a test to cover this case.
    
    (cherry picked from commit 4a8d40c7540c5ec337a4c086db13299102380e12)
---
 .../apache/pulsar/broker/systopic/SystemTopicClient.java   |  5 +++++
 .../systopic/NamespaceEventsSystemTopicServiceTest.java    | 14 ++++++++++++++
 2 files changed, 19 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index c5a3352..855f30d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -169,6 +169,11 @@ public interface SystemTopicClient {
     }
 
     static boolean isSystemTopic(TopicName topicName) {
+        if (topicName.isPartitioned()) {
+            return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME
+                    .equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
+        }
+
         return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName());
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index 52da458..1363e1d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -24,9 +24,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.events.TopicPoliciesEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -106,6 +108,18 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
         Assert.assertEquals(systemTopicClientForNamespace1.getReaders().size(), 0);
     }
 
+    @Test(timeOut = 30000)
+    public void checkSystemTopic() throws PulsarAdminException {
+        final String systemTopic = "persistent://" + NAMESPACE1 + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        final String normalTopic = "persistent://" + NAMESPACE1 + "/normal_topic";
+        admin.topics().createPartitionedTopic(normalTopic, 3);
+        TopicName systemTopicName = TopicName.get(systemTopic);
+        TopicName normalTopicName = TopicName.get(normalTopic);
+
+        Assert.assertEquals(SystemTopicClient.isSystemTopic(systemTopicName), true);
+        Assert.assertEquals(SystemTopicClient.isSystemTopic(normalTopicName), false);
+    }
+
     private void prepareData() throws PulsarAdminException {
         admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
         admin.tenants().createTenant("system-topic",

[pulsar] 01/06: Make failPendingMessages called from within the ProducerImpl object mutex (#10528)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d3c0715ec61260bbc31ceb96c6a6721fd5142496
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sat May 15 09:59:04 2021 +0800

    Make failPendingMessages called from within the ProducerImpl object mutex (#10528)
    
    `failPendingMessages()` will traverse the `pendingMessages` queue and this operation is not atomic.
    `failPendingMessages()` should be called from within the `ProducerImpl` object mutex.
    
    https://github.com/apache/pulsar/blob/4d2d66d17da73426e2281251f3b05a63218b70fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1602-L1608
    
    There are 6 places in ` ProducerImpl ` that use this method, but 3 places are not locked.
    
    Add object mutex
    
    (cherry picked from commit cd7e3c05ce0ee64341341541a965df3305e6bfc1)
---
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java    | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 786af84..40eba48 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -912,10 +912,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         if (previousState != State.Terminated && previousState != State.Closed) {
             log.info("[{}] [{}] The topic has been terminated", topic, producerName);
             setClientCnx(null);
-
-            failPendingMessages(cnx,
-                new PulsarClientException.TopicTerminatedException(
-                    format("The topic %s that the producer %s produces to has been terminated", topic, producerName)));
+            synchronized (this) {
+                failPendingMessages(cnx,
+                        new PulsarClientException.TopicTerminatedException(
+                                format("The topic %s that the producer %s produces to has been terminated", topic, producerName)));
+            }
         }
     }
 
@@ -1387,7 +1388,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
                     if (cause instanceof PulsarClientException.TopicTerminatedException) {
                         setState(State.Terminated);
-                        failPendingMessages(cnx(), (PulsarClientException) cause);
+                        synchronized (this) {
+                            failPendingMessages(cnx(), (PulsarClientException) cause);
+                        }
                         producerCreatedFuture.completeExceptionally(cause);
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //

[pulsar] 05/06: [Issue 10161] Fix missing LoggerFactoryPtr type. (#10164)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b272df4e64a8f648c52837f66cbc3bda414ed135
Author: Andreas Neustifter <an...@gmail.com>
AuthorDate: Fri May 21 14:48:18 2021 +0200

    [Issue 10161] Fix missing LoggerFactoryPtr type. (#10164)
    
    Fixes #10161.
    
    ### Motivation
    
    With issue #7132 the LoggerFactoryPtr was removed in favour of relevant memory-safe(r) pointers.
    
    This change forgot to also change the Log4Cxx implementation, fix this by returning (as with
    the other LoggerFactory's) a std::unique_ptr<LoggerFactory>.
    
    (cherry picked from commit d700f8d742f3a4be8330b773e27f59bbc528926f)
---
 pulsar-client-cpp/lib/Log4CxxLogger.h  | 5 +++--
 pulsar-client-cpp/lib/Log4cxxLogger.cc | 8 ++++----
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/lib/Log4CxxLogger.h b/pulsar-client-cpp/lib/Log4CxxLogger.h
index e3c0b56..cd2fe9e 100644
--- a/pulsar-client-cpp/lib/Log4CxxLogger.h
+++ b/pulsar-client-cpp/lib/Log4CxxLogger.h
@@ -28,11 +28,12 @@ namespace pulsar {
 
 class PULSAR_PUBLIC Log4CxxLoggerFactory : public LoggerFactory {
    public:
-    static LoggerFactoryPtr create();
-    static LoggerFactoryPtr create(const std::string& log4cxxConfFile);
+    static std::unique_ptr<LoggerFactory> create();
+    static std::unique_ptr<LoggerFactory> create(const std::string& log4cxxConfFile);
 
     Logger* getLogger(const std::string& fileName);
 };
+
 }  // namespace pulsar
 
 #endif
diff --git a/pulsar-client-cpp/lib/Log4cxxLogger.cc b/pulsar-client-cpp/lib/Log4cxxLogger.cc
index 5b1a2ab..f28da1a 100644
--- a/pulsar-client-cpp/lib/Log4cxxLogger.cc
+++ b/pulsar-client-cpp/lib/Log4cxxLogger.cc
@@ -62,7 +62,7 @@ class Log4CxxLogger : public Logger {
     }
 };
 
-LoggerFactoryPtr Log4CxxLoggerFactory::create() {
+std::unique_ptr<LoggerFactory> Log4CxxLoggerFactory::create() {
     if (!LogManager::getLoggerRepository()->isConfigured()) {
         LogManager::getLoggerRepository()->setConfigured(true);
         LoggerPtr root = log4cxx::Logger::getRootLogger();
@@ -73,10 +73,10 @@ LoggerFactoryPtr Log4CxxLoggerFactory::create() {
         root->addAppender(appender);
     }
 
-    return LoggerFactoryPtr(new Log4CxxLoggerFactory());
+    return std::unique_ptr<LoggerFactory>(new Log4CxxLoggerFactory());
 }
 
-LoggerFactoryPtr Log4CxxLoggerFactory::create(const std::string &log4cxxConfFile) {
+std::unique_ptr<LoggerFactory> Log4CxxLoggerFactory::create(const std::string &log4cxxConfFile) {
     try {
         log4cxx::PropertyConfigurator::configure(log4cxxConfFile);
     } catch (const std::exception &e) {
@@ -87,7 +87,7 @@ LoggerFactoryPtr Log4CxxLoggerFactory::create(const std::string &log4cxxConfFile
                   << std::endl;
     }
 
-    return LoggerFactoryPtr(new Log4CxxLoggerFactory());
+    return std::unique_ptr<LoggerFactory>(new Log4CxxLoggerFactory());
 }
 
 Logger *Log4CxxLoggerFactory::getLogger(const std::string &fileName) { return new Log4CxxLogger(fileName); }