You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 11:12:53 UTC
[pulsar] branch branch-2.7 updated (2cb726d -> 139060b)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.
from 2cb726d Fix the interceptor that not handle boundary for multipart/form-data (#9247)
new 094643b Fix interceptor disabled in ResponseHandlerFilter.java (#9252)
new 139060b PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../apache/pulsar/broker/service/ServerCnx.java | 120 +++++++++++++--------
.../pulsar/broker/web/ResponseHandlerFilter.java | 3 +-
.../broker/intercept/InterceptFilterOutTest.java | 26 +++++
.../pulsar/broker/service/ServerCnxTest.java | 6 +-
4 files changed, 105 insertions(+), 50 deletions(-)
[pulsar] 02/02: PLSR-1456: Fix race condition on producer/consumer
maps in ServerCnx (#9256)
Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 139060b9820a0bbda5414d1b154b9a73f74546b8
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu Jan 21 22:52:45 2021 +0100
PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)
ServerCnx had a callback that was called from Producer/Consumer which
would remove the producer/consumer from its map using only the
ID. However, it is possible that this callback runs when the
producer/consumer had already been removed from the map and another
producer/consumer added in its place.
The solution is to use both the key and value when removing from the
map.
The change also updates the log messages to include the producerId and
consumerId in a format that all log messages for an individual
producerId/consumerId can be easier found.
A test has been changed because the test was depending on the broken
behaviour. What was happening was that the fail topic producer was
failing to create a producer, and when this happened it removed the
producer future for the successful producer. Then, when the third
producer tries to connect, it sees manages to create the producer on
the connection, but fails as there is already a producer with that
name on the topic. The correct behaviour is that it should see the
successful producer future for that ID and respond with success.
Co-authored-by: Ivan Kelly <ik...@splunk.com>
(cherry picked from commit 1e4c3ec4f55ad0b0729f2849915f6b4d9e426bb1)
---
.../apache/pulsar/broker/service/ServerCnx.java | 120 +++++++++++++--------
.../pulsar/broker/service/ServerCnxTest.java | 6 +-
2 files changed, 77 insertions(+), 49 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e931c94..bb169b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -874,8 +874,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
- log.info("[{}] Consumer with the same id {} is already created: {}", remoteAddress,
- consumerId, consumer);
+ log.info("[{}] Consumer with the same id is already created:"
+ + " consumerId={}, consumer={}",
+ remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
return null;
} else {
@@ -884,14 +885,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
- log.warn("[{}][{}][{}] Consumer with id {} is already present on the connection", remoteAddress,
- topicName, subscriptionName, consumerId);
+ log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = null;
if(!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
}else {
error = getErrorCode(existingConsumerFuture);
- consumers.remove(consumerId);
+ consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
@@ -962,11 +963,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException) {
- log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
- subscriptionName, exception.getCause().getMessage());
+ log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
+ remoteAddress, topicName, subscriptionName,
+ consumerId, exception.getCause().getMessage());
} else {
- log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
- subscriptionName, exception.getCause().getMessage(), exception);
+ log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
+ remoteAddress, topicName, subscriptionName,
+ consumerId, exception.getCause().getMessage(), exception);
}
// If client timed out, the future would have been completed by subsequent close.
@@ -1056,10 +1059,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (existingProducerFuture != null) {
if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
Producer producer = existingProducerFuture.getNow(null);
- log.info("[{}] Producer with the same id {} is already created: {}", remoteAddress,
- producerId, producer);
+ log.info("[{}] Producer with the same id is already created:"
+ + " producerId={}, producer={}", remoteAddress, producerId, producer);
commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
producer.getSchemaVersion());
+
return null;
} else {
// There was an early request to create a producer with
@@ -1075,12 +1079,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
} else {
error = getErrorCode(existingProducerFuture);
// remove producer with producerId as it's already completed with exception
- producers.remove(producerId);
+ producers.remove(producerId, existingProducerFuture);
}
- log.warn("[{}][{}] Producer with id {} is already present on the connection", remoteAddress,
- producerId, topicName);
+ log.warn("[{}][{}] Producer with id is already present on the connection,"
+ + " producerId={}", remoteAddress, topicName, producerId);
commandSender.sendErrorResponse(requestId, error,
- "Producer is already present on the connection");
+ "Producer is already present on the connection");
+
return null;
}
}
@@ -1154,6 +1159,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
remoteAddress, producer);
producerFuture.completeExceptionally(
new IllegalStateException("Producer created after connection was closed"));
+
}
} catch (Exception ise) {
log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
@@ -1174,7 +1180,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (!(cause instanceof ServiceUnitNotReadyException)) {
// Do not print stack traces for expected exceptions
- log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
+ log.error("[{}] Failed to create topic {}, producerId={}",
+ remoteAddress, topicName, producerId, exception);
}
// If client timed out, the future would have been completed
@@ -1408,7 +1415,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
CompletableFuture<Producer> producerFuture = producers.get(producerId);
if (producerFuture == null) {
- log.warn("[{}] Producer {} was not registered on the connection", remoteAddress, producerId);
+ log.warn("[{}] Producer was not registered on the connection. producerId={}", remoteAddress, producerId);
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Producer was not registered on the connection");
return;
@@ -1418,12 +1425,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
// We have received a request to close the producer before it was actually completed, we have marked the
// producer future as failed and we can tell the client the close operation was successful.
- log.info("[{}] Closed producer {} before its creation was completed", remoteAddress, producerId);
+ log.info("[{}] Closed producer before its creation was completed. producerId={}",
+ remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
return;
} else if (producerFuture.isCompletedExceptionally()) {
- log.info("[{}] Closed producer {} that already failed to be created", remoteAddress, producerId);
+ log.info("[{}] Closed producer that already failed to be created. producerId={}",
+ remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
return;
@@ -1431,11 +1440,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// Proceed with normal close, the producer
Producer producer = producerFuture.getNow(null);
- log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), producer.getProducerName(), remoteAddress);
+ log.info("[{}][{}] Closing producer on cnx {}. producerId={}",
+ producer.getTopic(), producer.getProducerName(), remoteAddress, producerId);
producer.close(true).thenAccept(v -> {
- log.info("[{}][{}] Closed producer on cnx {}", producer.getTopic(), producer.getProducerName(),
- remoteAddress);
+ log.info("[{}][{}] Closed producer on cnx {}. producerId={}",
+ producer.getTopic(), producer.getProducerName(),
+ remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
});
@@ -1444,14 +1455,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
checkArgument(state == State.Connected);
- log.info("[{}] Closing consumer: {}", remoteAddress, closeConsumer.getConsumerId());
+ log.info("[{}] Closing consumer: consumerId={}", remoteAddress, closeConsumer.getConsumerId());
long requestId = closeConsumer.getRequestId();
long consumerId = closeConsumer.getConsumerId();
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture == null) {
- log.warn("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress);
+ log.warn("[{}] Consumer was not registered on the connection: consumerId={}", remoteAddress, consumerId);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
return;
}
@@ -1461,13 +1472,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// We have received a request to close the consumer before it was actually completed, we have marked the
// consumer future as failed and we can tell the client the close operation was successful. When the actual
// create operation will complete, the new consumer will be discarded.
- log.info("[{}] Closed consumer {} before its creation was completed", remoteAddress, consumerId);
+ log.info("[{}] Closed consumer before its creation was completed. consumerId={}",
+ remoteAddress, consumerId);
commandSender.sendSuccessResponse(requestId);
return;
}
if (consumerFuture.isCompletedExceptionally()) {
- log.info("[{}] Closed consumer {} that already failed to be created", remoteAddress, consumerId);
+ log.info("[{}] Closed consumer that already failed to be created. consumerId={}",
+ remoteAddress, consumerId);
commandSender.sendSuccessResponse(requestId);
return;
}
@@ -1478,7 +1491,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
consumer.close();
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
- log.info("[{}] Closed consumer {}", remoteAddress, consumer);
+ log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
} catch (BrokerServiceException e) {
log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
@@ -1879,13 +1892,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
public void closeProducer(Producer producer) {
// removes producer-connection from map and send close command to producer
- if (log.isDebugEnabled()) {
- log.debug("[{}] Removed producer: {}", remoteAddress, producer);
- }
- long producerId = producer.getProducerId();
- producers.remove(producerId);
+ safelyRemoveProducer(producer);
if (remoteEndpointProtocolVersion >= v5.getNumber()) {
- ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
+ ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
} else {
close();
}
@@ -1895,13 +1904,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
public void closeConsumer(Consumer consumer) {
// removes consumer-connection from map and send close command to consumer
- if (log.isDebugEnabled()) {
- log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
- }
- long consumerId = consumer.consumerId();
- consumers.remove(consumerId);
+ safelyRemoveConsumer(consumer);
if (remoteEndpointProtocolVersion >= v5.getNumber()) {
- ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
+ ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
} else {
close();
}
@@ -1922,19 +1927,42 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
public void removedConsumer(Consumer consumer) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
- }
-
- consumers.remove(consumer.consumerId());
+ safelyRemoveConsumer(consumer);
}
@Override
public void removedProducer(Producer producer) {
+ safelyRemoveProducer(producer);
+ }
+
+ private void safelyRemoveProducer(Producer producer) {
+ long producerId = producer.getProducerId();
if (log.isDebugEnabled()) {
- log.debug("[{}] Removed producer: {}", remoteAddress, producer);
+ log.debug("[{}] Removed producer: producerId={}, producer={}", remoteAddress, producerId, producer);
+ }
+ CompletableFuture<Producer> future = producers.get(producerId);
+ if (future != null) {
+ future.whenComplete((producer2, exception) -> {
+ if (exception != null || producer2 == producer) {
+ producers.remove(producerId, future);
+ }
+ });
+ }
+ }
+
+ private void safelyRemoveConsumer(Consumer consumer) {
+ long consumerId = consumer.consumerId();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Removed consumer: consumerId={}, consumer={}", remoteAddress, consumerId, consumer);
+ }
+ CompletableFuture<Consumer> future = consumers.get(consumerId);
+ if (future != null) {
+ future.whenComplete((consumer2, exception) -> {
+ if (exception != null || consumer2 == consumer) {
+ consumers.remove(consumerId, future);
+ }
+ });
}
- producers.remove(producer.getProducerId());
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index cc7216b..ba78054 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -953,10 +953,10 @@ public class ServerCnxTest {
producerName, Collections.emptyMap());
channel.writeInbound(createProducer3);
- // 3rd producer fails because 2nd is already connected
+ // 3rd producer succeeds because 2nd is already connected
response = getResponse();
- assertEquals(response.getClass(), CommandError.class);
- assertEquals(((CommandError) response).getRequestId(), 4);
+ assertEquals(response.getClass(), CommandProducerSuccess.class);
+ assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);
Thread.sleep(500);
[pulsar] 01/02: Fix interceptor disabled in
ResponseHandlerFilter.java (#9252)
Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 094643b652561b1347339e2cca90bc0d47ac2dc3
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jan 21 15:16:06 2021 +0800
Fix interceptor disabled in ResponseHandlerFilter.java (#9252)
### Motivation
Fix interceptor disabled in ResponseHandlerFilter.java
### Verifying this change
New unit test added.
(cherry picked from commit de7b59d784f5e54b0031f524b08f2f03ca88e2ca)
---
.../pulsar/broker/web/ResponseHandlerFilter.java | 3 ++-
.../broker/intercept/InterceptFilterOutTest.java | 26 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
index d7d16f9..9091d05 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
@@ -66,7 +66,8 @@ public class ResponseHandlerFilter implements Filter {
/* connection is already invalidated */
}
}
- if (!StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA)
+ if (interceptorEnabled
+ && !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA)
&& !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.APPLICATION_OCTET_STREAM)) {
interceptor.onWebserviceResponse(request, response);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
index 53c5463..fd13409 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -110,6 +110,32 @@ public class InterceptFilterOutTest {
}
}
+ @Test
+ public void testShouldNotInterceptWhenInterceptorDisabled() throws Exception {
+ CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
+ PulsarService pulsarService = Mockito.mock(PulsarService.class);
+ Mockito.doReturn("pulsar://127.0.0.1:6650").when(pulsarService).getAdvertisedAddress();
+ Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
+ ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
+ // Disable the broker interceptor
+ Mockito.doReturn(Sets.newHashSet()).when(conf).getBrokerInterceptors();
+ Mockito.doReturn(conf).when(pulsarService).getConfig();
+ ResponseHandlerFilter filter = new ResponseHandlerFilter(pulsarService);
+
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ FilterChain chain = Mockito.mock(FilterChain.class);
+ Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
+ HttpServletRequestWrapper mockInputStream = new MockRequestWrapper(request);
+ Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
+ Mockito.doReturn(new StringBuffer("http://127.0.0.1:8080")).when(request).getRequestURL();
+
+ // Should not be intercepted since the broker interceptor disabled.
+ Mockito.doReturn("application/json").when(request).getContentType();
+ filter.doFilter(request, response, chain);
+ Assert.assertEquals(interceptor.getCount(), 0);
+ }
+
private static class MockRequestWrapper extends HttpServletRequestWrapper {
public MockRequestWrapper(HttpServletRequest request) {