You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/06 02:51:20 UTC

[incubator-pulsar] branch master updated: Throw ProducerBusy when producer with same name is already connected (#1174)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03ade5c  Throw ProducerBusy when producer with same name is already connected (#1174)
03ade5c is described below

commit 03ade5c078b85111d2a70ad4b9a3fedaa7e7a959
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Feb 5 18:51:18 2018 -0800

    Throw ProducerBusy when producer with same name is already connected (#1174)
---
 .../broker/service/BrokerServiceException.java     |  2 ++
 .../broker/service/PersistentTopicE2ETest.java     | 36 ++++++++++++++++++++++
 .../pulsar/client/api/ProducerConfiguration.java   |  4 +++
 .../pulsar/client/api/PulsarClientException.java   |  6 ++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 ++
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  3 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |  2 ++
 .../apache/pulsar/websocket/ProducerHandler.java   | 30 +++++++++++++-----
 8 files changed, 78 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 1127237..a8b4f35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -136,6 +136,8 @@ public class BrokerServiceException extends Exception {
     public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
         if (t instanceof ServerMetadataException) {
             return PulsarApi.ServerError.MetadataError;
+        } else if (t instanceof NamingException) {
+            return PulsarApi.ServerError.ProducerBusy;
         } else if (t instanceof PersistenceException) {
             return PulsarApi.ServerError.PersistenceError;
         } else if (t instanceof ConsumerBusyException) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 93ce4b5..3ce76d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -18,6 +18,12 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -26,6 +32,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
@@ -39,6 +46,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -58,6 +68,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -68,6 +79,8 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1310,4 +1323,27 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         producer.close();
     }
 
+    @Test
+    public void testCreateProducerWithSameName() throws Exception {
+        String topic = "persistent://prop/use/ns-abc/testCreateProducerWithSameName";
+
+        ProducerConfiguration conf = new ProducerConfiguration();
+        conf.setProducerName("test-producer-a");
+
+        Producer p1 = pulsarClient.createProducer(topic, conf);
+
+        try {
+            pulsarClient.createProducer(topic, conf);
+            fail("Should have thrown ProducerBusyException");
+        } catch (ProducerBusyException e) {
+            // Expected
+        }
+
+        p1.close();
+
+        // Now p2 should succeed
+        Producer p2 = pulsarClient.createProducer(topic, conf);
+
+        p2.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 9bde4c8..a6d2a55 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 
@@ -88,6 +89,9 @@ public class ProducerConfiguration implements Serializable {
      * <p>
      * When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique
      * across all Pulsar's clusters.
+     * <p>
+     * If a producer with the same name is already connected to a particular topic, the
+     * {@link PulsarClient#createProducer(String)} operation will fail with {@link ProducerBusyException}.
      *
      * @param producerName
      *            the custom name to use for the producer
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 5f98cb4..2bcd142 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -131,6 +131,12 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    public static class ProducerBusyException extends PulsarClientException {
+        public ProducerBusyException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class ConsumerBusyException extends PulsarClientException {
         public ConsumerBusyException(String msg) {
             super(msg);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 85e2867..5ad4c66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -527,6 +527,8 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException.AuthenticationException(errorMsg);
         case AuthorizationError:
             return new PulsarClientException.AuthorizationException(errorMsg);
+        case ProducerBusy:
+            return new PulsarClientException.ProducerBusyException(errorMsg);
         case ConsumerBusy:
             return new PulsarClientException.ConsumerBusyException(errorMsg);
         case MetadataError:
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index eef3651..9521594 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -70,6 +70,7 @@ public final class PulsarApi {
     ConsumerNotFound(13, 13),
     TooManyRequests(14, 14),
     TopicTerminatedError(15, 15),
+    ProducerBusy(16, 16),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -88,6 +89,7 @@ public final class PulsarApi {
     public static final int ConsumerNotFound_VALUE = 13;
     public static final int TooManyRequests_VALUE = 14;
     public static final int TopicTerminatedError_VALUE = 15;
+    public static final int ProducerBusy_VALUE = 16;
     
     
     public final int getNumber() { return value; }
@@ -110,6 +112,7 @@ public final class PulsarApi {
         case 13: return ConsumerNotFound;
         case 14: return TooManyRequests;
         case 15: return TopicTerminatedError;
+        case 16: return ProducerBusy;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index faa894f..d5661fa 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -109,6 +109,8 @@ enum ServerError {
     ConsumerNotFound = 13; // Consumer not found
     TooManyRequests = 14; // Error with too many simultaneously request
     TopicTerminatedError = 15; // The topic has been terminated
+
+    ProducerBusy         = 16; // Producer with same name is already connected
 }
 
 enum AuthMethod {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 13758b8..e551885 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerAck;
@@ -99,14 +100,11 @@ public class ProducerHandler extends AbstractWebSocketHandler {
                         request.getRemotePort(), topic);
             }
         } catch (Exception e) {
-            log.warn("[{}:{}] Failed in creating producer on topic {}", request.getRemoteAddr(),
-                    request.getRemotePort(), topic, e);
-            boolean configError = e instanceof IllegalArgumentException;
-            int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST
-                    : HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
-            String errorMsg = configError ? "Invalid query-param " + e.getMessage() : "Failed to create producer";
+            log.warn("[{}:{}] Failed in creating producer on topic {}: {}", request.getRemoteAddr(),
+                    request.getRemotePort(), topic, e.getMessage());
+
             try {
-                response.sendError(errorCode, errorMsg);
+                response.sendError(getErrorCode(e), getErrorMessage(e));
             } catch (IOException e1) {
                 log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
                         e1.getMessage(), e1);
@@ -114,6 +112,24 @@ public class ProducerHandler extends AbstractWebSocketHandler {
         }
     }
 
+    private static int getErrorCode(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return HttpServletResponse.SC_BAD_REQUEST;
+        } else if (e instanceof ProducerBusyException) {
+            return HttpServletResponse.SC_CONFLICT;
+        } else {
+            return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+        }
+    }
+
+    private static String getErrorMessage(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return "Invalid query params: " + e.getMessage();
+        } else {
+            return "Failed to create producer: " + e.getMessage();
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (producer != null) {

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.