You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/06 02:51:20 UTC
[incubator-pulsar] branch master updated: Throw ProducerBusy when
producer with same name is already connected (#1174)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 03ade5c Throw ProducerBusy when producer with same name is already connected (#1174)
03ade5c is described below
commit 03ade5c078b85111d2a70ad4b9a3fedaa7e7a959
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Feb 5 18:51:18 2018 -0800
Throw ProducerBusy when producer with same name is already connected (#1174)
---
.../broker/service/BrokerServiceException.java | 2 ++
.../broker/service/PersistentTopicE2ETest.java | 36 ++++++++++++++++++++++
.../pulsar/client/api/ProducerConfiguration.java | 4 +++
.../pulsar/client/api/PulsarClientException.java | 6 ++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 2 ++
.../apache/pulsar/common/api/proto/PulsarApi.java | 3 ++
pulsar-common/src/main/proto/PulsarApi.proto | 2 ++
.../apache/pulsar/websocket/ProducerHandler.java | 30 +++++++++++++-----
8 files changed, 78 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 1127237..a8b4f35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -136,6 +136,8 @@ public class BrokerServiceException extends Exception {
public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
if (t instanceof ServerMetadataException) {
return PulsarApi.ServerError.MetadataError;
+ } else if (t instanceof NamingException) {
+ return PulsarApi.ServerError.ProducerBusy;
} else if (t instanceof PersistenceException) {
return PulsarApi.ServerError.PersistenceError;
} else if (t instanceof ConsumerBusyException) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 93ce4b5..3ce76d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -18,6 +18,12 @@
*/
package org.apache.pulsar.broker.service;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -26,6 +32,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
@@ -39,6 +46,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -58,6 +68,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -68,6 +79,8 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -1310,4 +1323,27 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
producer.close();
}
+ @Test
+ public void testCreateProducerWithSameName() throws Exception {
+ String topic = "persistent://prop/use/ns-abc/testCreateProducerWithSameName";
+
+ ProducerConfiguration conf = new ProducerConfiguration();
+ conf.setProducerName("test-producer-a");
+
+ Producer p1 = pulsarClient.createProducer(topic, conf);
+
+ try {
+ pulsarClient.createProducer(topic, conf);
+ fail("Should have thrown ProducerBusyException");
+ } catch (ProducerBusyException e) {
+ // Expected
+ }
+
+ p1.close();
+
+ // Now p2 should succeed
+ Producer p2 = pulsarClient.createProducer(topic, conf);
+
+ p2.close();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 9bde4c8..a6d2a55 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -88,6 +89,9 @@ public class ProducerConfiguration implements Serializable {
* <p>
* When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique
* across all Pulsar's clusters.
+ * <p>
+ * If a producer with the same name is already connected to a particular topic, the
+ * {@link PulsarClient#createProducer(String)} operation will fail with {@link ProducerBusyException}.
*
* @param producerName
* the custom name to use for the producer
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 5f98cb4..2bcd142 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -131,6 +131,12 @@ public class PulsarClientException extends IOException {
}
}
+ public static class ProducerBusyException extends PulsarClientException {
+ public ProducerBusyException(String msg) {
+ super(msg);
+ }
+ }
+
public static class ConsumerBusyException extends PulsarClientException {
public ConsumerBusyException(String msg) {
super(msg);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 85e2867..5ad4c66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -527,6 +527,8 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException.AuthenticationException(errorMsg);
case AuthorizationError:
return new PulsarClientException.AuthorizationException(errorMsg);
+ case ProducerBusy:
+ return new PulsarClientException.ProducerBusyException(errorMsg);
case ConsumerBusy:
return new PulsarClientException.ConsumerBusyException(errorMsg);
case MetadataError:
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index eef3651..9521594 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -70,6 +70,7 @@ public final class PulsarApi {
ConsumerNotFound(13, 13),
TooManyRequests(14, 14),
TopicTerminatedError(15, 15),
+ ProducerBusy(16, 16),
;
public static final int UnknownError_VALUE = 0;
@@ -88,6 +89,7 @@ public final class PulsarApi {
public static final int ConsumerNotFound_VALUE = 13;
public static final int TooManyRequests_VALUE = 14;
public static final int TopicTerminatedError_VALUE = 15;
+ public static final int ProducerBusy_VALUE = 16;
public final int getNumber() { return value; }
@@ -110,6 +112,7 @@ public final class PulsarApi {
case 13: return ConsumerNotFound;
case 14: return TooManyRequests;
case 15: return TopicTerminatedError;
+ case 16: return ProducerBusy;
default: return null;
}
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index faa894f..d5661fa 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -109,6 +109,8 @@ enum ServerError {
ConsumerNotFound = 13; // Consumer not found
TooManyRequests = 14; // Error with too many simultaneously request
TopicTerminatedError = 15; // The topic has been terminated
+
+ ProducerBusy = 16; // Producer with same name is already connected
}
enum AuthMethod {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 13758b8..e551885 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
@@ -99,14 +100,11 @@ public class ProducerHandler extends AbstractWebSocketHandler {
request.getRemotePort(), topic);
}
} catch (Exception e) {
- log.warn("[{}:{}] Failed in creating producer on topic {}", request.getRemoteAddr(),
- request.getRemotePort(), topic, e);
- boolean configError = e instanceof IllegalArgumentException;
- int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST
- : HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
- String errorMsg = configError ? "Invalid query-param " + e.getMessage() : "Failed to create producer";
+ log.warn("[{}:{}] Failed in creating producer on topic {}: {}", request.getRemoteAddr(),
+ request.getRemotePort(), topic, e.getMessage());
+
try {
- response.sendError(errorCode, errorMsg);
+ response.sendError(getErrorCode(e), getErrorMessage(e));
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
@@ -114,6 +112,24 @@ public class ProducerHandler extends AbstractWebSocketHandler {
}
}
+ private static int getErrorCode(Exception e) {
+ if (e instanceof IllegalArgumentException) {
+ return HttpServletResponse.SC_BAD_REQUEST;
+ } else if (e instanceof ProducerBusyException) {
+ return HttpServletResponse.SC_CONFLICT;
+ } else {
+ return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ private static String getErrorMessage(Exception e) {
+ if (e instanceof IllegalArgumentException) {
+ return "Invalid query params: " + e.getMessage();
+ } else {
+ return "Failed to create producer: " + e.getMessage();
+ }
+ }
+
@Override
public void close() throws IOException {
if (producer != null) {
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.