You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/15 13:53:09 UTC
[pulsar] branch master updated: [feature][broker] PIP-204: Extensions for broker interceptor (#17269)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c7b714603a8 [feature][broker] PIP-204: Extensions for broker interceptor (#17269)
c7b714603a8 is described below
commit c7b714603a8a9a8bd401e6939dc811f763d65f72
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Thu Sep 15 21:52:58 2022 +0800
[feature][broker] PIP-204: Extensions for broker interceptor (#17269)
---
.../pulsar/broker/intercept/BrokerInterceptor.java | 54 +++++++++
.../BrokerInterceptorWithClassLoader.java | 39 ++++++
.../broker/intercept/BrokerInterceptors.java | 134 +++++++++++++--------
.../broker/service/AbstractBaseDispatcher.java | 2 +
.../org/apache/pulsar/broker/service/Producer.java | 49 +++++---
.../apache/pulsar/broker/service/ServerCnx.java | 52 +++-----
.../org/apache/pulsar/broker/service/Topic.java | 12 ++
.../broker/intercept/BrokerInterceptorTest.java | 45 ++++++-
.../BrokerInterceptorWithClassLoaderTest.java | 8 ++
.../broker/intercept/CounterBrokerInterceptor.java | 52 ++++++++
10 files changed, 350 insertions(+), 97 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index 0c56c29b621..6ba72370d1e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -52,18 +52,36 @@ public interface BrokerInterceptor extends AutoCloseable {
/**
* Intercept messages before sending them to the consumers.
+ * Deprecated, use {@link #beforeSendMessage(Subscription, Entry, long[], MessageMetadata, Consumer)} instead.
*
* @param subscription pulsar subscription
* @param entry entry
* @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
* @param msgMetadata message metadata. The message metadata will be recycled after this call.
*/
+ @Deprecated
default void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
}
+ /**
+ * Intercept messages before sending them to the consumers.
+ *
+ * @param subscription pulsar subscription
+ * @param entry entry
+ * @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
+ * @param msgMetadata message metadata. The message metadata will be recycled after this call.
+ * @param consumer consumer. Consumer which entry are sent to.
+ */
+ default void beforeSendMessage(Subscription subscription,
+ Entry entry,
+ long[] ackSet,
+ MessageMetadata msgMetadata,
+ Consumer consumer) {
+ }
+
/**
* Called by the broker when a new connection is created.
*/
@@ -77,6 +95,18 @@ public interface BrokerInterceptor extends AutoCloseable {
Map<String, String> metadata){
}
+ /**
+ * Called by the broker when a producer is closed.
+ *
+ * @param cnx client Connection
+ * @param producer Producer object
+ * @param metadata A map of metadata
+ */
+ default void producerClosed(ServerCnx cnx,
+ Producer producer,
+ Map<String, String> metadata) {
+ }
+
/**
* Intercept after a consumer is created.
*
@@ -89,6 +119,30 @@ public interface BrokerInterceptor extends AutoCloseable {
Map<String, String> metadata) {
}
+ /**
+ * Called by the broker when a consumer is closed.
+ *
+ * @param cnx client Connection
+ * @param consumer Consumer object
+ * @param metadata A map of metadata
+ */
+ default void consumerClosed(ServerCnx cnx,
+ Consumer consumer,
+ Map<String, String> metadata) {
+ }
+
+ /**
+ * Intercept message when broker receive a send request.
+ *
+ * @param headersAndPayload entry's header and payload
+ * @param publishContext Publish Context
+ */
+ default void onMessagePublish(Producer producer,
+ ByteBuf headersAndPayload,
+ Topic.PublishContext publishContext) {
+
+ }
+
/**
* Intercept after a message is produced.
*
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index f04446fa9a0..f309aff9a61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -63,6 +63,26 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
}
}
+ @Override
+ public void beforeSendMessage(Subscription subscription,
+ Entry entry,
+ long[] ackSet,
+ MessageMetadata msgMetadata,
+ Consumer consumer) {
+ try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+ this.interceptor.beforeSendMessage(
+ subscription, entry, ackSet, msgMetadata, consumer);
+ }
+ }
+
+ @Override
+ public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
+ Topic.PublishContext publishContext) {
+ try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+ this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext);
+ }
+ }
+
@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
@@ -71,6 +91,15 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
}
}
+ @Override
+ public void producerClosed(ServerCnx cnx,
+ Producer producer,
+ Map<String, String> metadata) {
+ try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+ this.interceptor.producerClosed(cnx, producer, metadata);
+ }
+ }
+
@Override
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
@@ -81,6 +110,16 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
}
}
+ @Override
+ public void consumerClosed(ServerCnx cnx,
+ Consumer consumer,
+ Map<String, String> metadata) {
+ try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+ this.interceptor.consumerClosed(cnx, consumer, metadata);
+ }
+ }
+
+
@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index 225066b9434..ade496c192f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -93,17 +93,39 @@ public class BrokerInterceptors implements BrokerInterceptor {
}
}
+ @Override
+ public void onMessagePublish(Producer producer,
+ ByteBuf headersAndPayload,
+ Topic.PublishContext publishContext) {
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.onMessagePublish(producer, headersAndPayload, publishContext);
+ }
+ }
+ }
+
@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.beforeSendMessage(
- subscription,
- entry,
- ackSet,
- msgMetadata);
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
+ }
+ }
+ }
+
+ @Override
+ public void beforeSendMessage(Subscription subscription,
+ Entry entry,
+ long[] ackSet,
+ MessageMetadata msgMetadata,
+ Consumer consumer) {
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
+ }
}
}
@@ -111,89 +133,103 @@ public class BrokerInterceptors implements BrokerInterceptor {
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
- if (interceptors == null || interceptors.isEmpty()) {
- return;
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.consumerCreated(
+ cnx,
+ consumer,
+ metadata);
+ }
}
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.consumerCreated(
- cnx,
- consumer,
- metadata);
+ }
+
+ @Override
+ public void consumerClosed(ServerCnx cnx,
+ Consumer consumer,
+ Map<String, String> metadata) {
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.consumerClosed(cnx, consumer, metadata);
+ }
}
}
@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
- if (interceptors == null || interceptors.isEmpty()) {
- return;
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.producerCreated(cnx, producer, metadata);
+ }
}
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.producerCreated(cnx, producer, metadata);
+ }
+
+ @Override
+ public void producerClosed(ServerCnx cnx,
+ Producer producer,
+ Map<String, String> metadata) {
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.producerClosed(cnx, producer, metadata);
+ }
}
}
@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
- if (interceptors == null || interceptors.isEmpty()) {
- return;
- }
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
+ }
}
}
@Override
public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
long entryId, ByteBuf headersAndPayload) {
- if (interceptors == null || interceptors.isEmpty()) {
- return;
- }
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
+ }
}
}
@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ackCmd) {
- if (interceptors == null || interceptors.isEmpty()) {
- return;
- }
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.messageAcked(cnx, consumer, ackCmd);
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.messageAcked(cnx, consumer, ackCmd);
+ }
}
}
@Override
public void txnOpened(long tcId, String txnID) {
- if (interceptors == null || interceptors.isEmpty()) {
- return;
- }
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.txnOpened(tcId, txnID);
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.txnOpened(tcId, txnID);
+ }
}
}
@Override
public void txnEnded(String txnID, long txnAction) {
- if (interceptors == null || interceptors.isEmpty()) {
- return;
- }
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.txnEnded(txnID, txnAction);
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.txnEnded(txnID, txnAction);
+ }
}
}
@Override
public void onConnectionCreated(ServerCnx cnx) {
- if (interceptors == null || interceptors.isEmpty()) {
- return;
- }
- for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
- value.onConnectionCreated(cnx);
+ if (interceptorsEnabled()) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.onConnectionCreated(cnx);
+ }
}
}
@@ -237,4 +273,8 @@ public class BrokerInterceptors implements BrokerInterceptor {
public void close() {
interceptors.values().forEach(BrokerInterceptorWithClassLoader::close);
}
+
+ private boolean interceptorsEnabled() {
+ return interceptors != null && !interceptors.isEmpty();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 9843c48f46c..3e3fba07eac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -197,7 +197,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
BrokerInterceptor interceptor = subscription.interceptor();
if (null != interceptor) {
+ // keep for compatibility if users has implemented the old interface
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
+ interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
}
}
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index f8591a8447a..b5d87a46cfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -246,18 +246,22 @@ public class Producer {
private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
boolean isMarker) {
- topic.publishMessage(headersAndPayload,
- MessagePublishContext.get(this, sequenceId, msgIn,
- headersAndPayload.readableBytes(), batchSize,
- isChunked, System.nanoTime(), isMarker));
+ MessagePublishContext messagePublishContext =
+ MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
+ batchSize, isChunked, System.nanoTime(), isMarker);
+ this.cnx.getBrokerService().getInterceptor()
+ .onMessagePublish(this, headersAndPayload, messagePublishContext);
+ topic.publishMessage(headersAndPayload, messagePublishContext);
}
private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
long batchSize, boolean isChunked, boolean isMarker) {
- topic.publishMessage(headersAndPayload,
- MessagePublishContext.get(this, lowestSequenceId,
- highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
- isChunked, System.nanoTime(), isMarker));
+ MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
+ highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
+ isChunked, System.nanoTime(), isMarker);
+ this.cnx.getBrokerService().getInterceptor()
+ .onMessagePublish(this, headersAndPayload, messagePublishContext);
+ topic.publishMessage(headersAndPayload, messagePublishContext);
}
private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -354,6 +358,8 @@ public class Producer {
private long highestSequenceId;
private long originalHighestSequenceId;
+ private long entryTimestamp;
+
public String getProducerName() {
return producer.getProducerName();
}
@@ -367,6 +373,15 @@ public class Producer {
return chunked;
}
+ @Override
+ public long getEntryTimestamp() {
+ return entryTimestamp;
+ }
+
+ @Override
+ public void setEntryTimestamp(long entryTimestamp) {
+ this.entryTimestamp = entryTimestamp;
+ }
@Override
public void setProperty(String propertyName, Object value){
if (this.propertyMap == null) {
@@ -483,10 +498,8 @@ public class Producer {
producer.chunkedMessageRate.recordEvent();
}
producer.publishOperationCompleted();
- if (producer.cnx.getBrokerService().getInterceptor() != null){
- producer.cnx.getBrokerService().getInterceptor().messageProduced(
- (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
- }
+ producer.cnx.getBrokerService().getInterceptor().messageProduced(
+ (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
recycle();
}
@@ -534,6 +547,11 @@ public class Producer {
return batchSize;
}
+ @Override
+ public long getMsgSize() {
+ return msgSize;
+ }
+
@Override
public boolean isMarkerMessage() {
return isMarker;
@@ -730,9 +748,12 @@ public class Producer {
public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize);
- topic.publishTxnMessage(txnID, headersAndPayload,
+ MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
- headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker));
+ headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker);
+ this.cnx.getBrokerService().getInterceptor()
+ .onMessagePublish(this, headersAndPayload, messagePublishContext);
+ topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext);
}
public SchemaVersion getSchemaVersion() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4fe703de878..f0db48e8869 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -310,9 +310,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
isActive = false;
log.info("Closed connection from {}", remoteAddress);
BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
- if (brokerInterceptor != null) {
- brokerInterceptor.onConnectionClosed(this);
- }
+ brokerInterceptor.onConnectionClosed(this);
cnxsPerThread.get().remove(this);
@@ -326,6 +324,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow(true);
+ brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
}
});
@@ -339,6 +338,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
Consumer consumer = consumerFuture.getNow(null);
try {
consumer.close();
+ brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
} catch (BrokerServiceException e) {
log.warn("Consumer {} was already closed: {}", consumer, e);
}
@@ -672,10 +672,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) {
this.clientVersion = clientVersion.intern();
}
- BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
- if (brokerInterceptor != null) {
- brokerInterceptor.onConnectionCreated(this);
- }
+ getBrokerService().getInterceptor().onConnectionCreated(this);
}
// According to auth result, send newConnected or newAuthChallenge command.
@@ -1137,9 +1134,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
- if (getBrokerService().getInterceptor() != null){
- getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
- }
+ getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
} else {
// The consumer future was completed before by a close command
try {
@@ -1480,10 +1475,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
- if (getBrokerService().getInterceptor() != null) {
- getBrokerService().getInterceptor().
+ getBrokerService().getInterceptor().
producerCreated(this, producer, metadata);
- }
return;
} else {
// The producer's future was completed before by
@@ -1530,10 +1523,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now */);
- if (getBrokerService().getInterceptor() != null) {
- getBrokerService().getInterceptor().
- producerCreated(this, producer, metadata);
- }
+ getBrokerService().getInterceptor().
+ producerCreated(this, producer, metadata);
}
});
}
@@ -1620,10 +1611,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
ctx.writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
- if (getBrokerService().getInterceptor() != null) {
- getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
- }
- }).exceptionally(e -> {
+ getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
+ }).exceptionally(e -> {
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(requestId,
BrokerServiceException.getClientErrorCode(e),
@@ -1816,6 +1805,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
+ getBrokerService().getInterceptor().producerClosed(this, producer, producer.getMetadata());
});
}
@@ -1859,6 +1849,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
+ getBrokerService().getInterceptor().consumerClosed(this, consumer, consumer.getMetadata());
} catch (BrokerServiceException e) {
log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
@@ -2655,9 +2646,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void interceptCommand(BaseCommand command) throws InterceptException {
- if (getBrokerService().getInterceptor() != null) {
- getBrokerService().getInterceptor().onPulsarCommand(command, this);
- }
+ getBrokerService().getInterceptor().onPulsarCommand(command, this);
}
public void closeProducer(Producer producer) {
@@ -2942,16 +2931,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
try {
val brokerInterceptor = getBrokerService().getInterceptor();
- if (brokerInterceptor != null) {
- brokerInterceptor.onPulsarCommand(command, this);
-
- CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
- if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
- Consumer consumer = consumerFuture.getNow(null);
- brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
- }
- } else {
- log.debug("BrokerInterceptor is not set in newMessageAndIntercept");
+ brokerInterceptor.onPulsarCommand(command, this);
+ CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
+ if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+ Consumer consumer = consumerFuture.getNow(null);
+ brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
}
} catch (Exception e) {
log.error("Exception occur when intercept messages.", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index b4f27adcc4a..312468933f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -101,6 +101,10 @@ public interface Topic {
return 1L;
}
+ default long getMsgSize() {
+ return -1L;
+ }
+
default boolean isMarkerMessage() {
return false;
}
@@ -115,6 +119,14 @@ public interface Topic {
default boolean isChunked() {
return false;
}
+
+ default long getEntryTimestamp() {
+ return -1L;
+ }
+
+ default void setEntryTimestamp(long entryTimestamp) {
+
+ }
}
CompletableFuture<Void> initialize();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index b2c3b8d711f..290cf19c8f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -146,6 +146,17 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 1);
}
+ @Test
+ public void testProducerClose() throws PulsarClientException {
+ BrokerInterceptor listener = pulsar.getBrokerInterceptor();
+ Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
+ assertEquals(((CounterBrokerInterceptor) listener).getProducerCount(), 0);
+ Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL).topic("test").create();
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 1);
+ producer.close();
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 0);
+ }
+
@Test
public void testConsumerCreation() throws PulsarClientException {
BrokerInterceptor listener = pulsar.getBrokerInterceptor();
@@ -155,6 +166,35 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1);
}
+ @Test
+ public void testConsumerClose() throws PulsarClientException {
+ BrokerInterceptor listener = pulsar.getBrokerInterceptor();
+ Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
+ assertEquals(((CounterBrokerInterceptor) listener).getConsumerCount(), 0);
+ Consumer<String> consumer = pulsarClient
+ .newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe();
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1);
+ consumer.close();
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 0);
+ }
+
+ @Test
+ public void testMessagePublishAndProduced() throws PulsarClientException {
+ BrokerInterceptor listener = pulsar.getBrokerInterceptor();
+ Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("test-before-send-message")
+ .create();
+
+ assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0);
+ assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0);
+ producer.send("hello world");
+ assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1);
+ assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1);
+ }
+
@Test
public void testBeforeSendMessage() throws PulsarClientException {
BrokerInterceptor listener = pulsar.getBrokerInterceptor();
@@ -170,16 +210,17 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
.subscriptionName("test")
.subscribe();
- assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0);
+ assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0);
assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),0);
producer.send("hello world");
- assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1);
+ assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1);
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "hello world");
Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getBeforeSendCountAtConsumerLevel() == 1);
Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getMessageDispatchCount() == 1);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
index 4f11792dbc2..758f5655453 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
@@ -69,6 +69,12 @@ public class BrokerInterceptorWithClassLoaderTest {
public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
+
+ @Override
+ public void beforeSendMessage(Subscription subscription,
+ Entry entry, long[] ackSet, MessageMetadata msgMetadata, Consumer consumer) {
+ assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+ }
@Override
public void onConnectionCreated(ServerCnx cnx) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
@@ -177,6 +183,8 @@ public class BrokerInterceptorWithClassLoaderTest {
// test beforeSendMessage
brokerInterceptorWithClassLoader
.beforeSendMessage(mock(Subscription.class), mock(Entry.class), null, null);
+ brokerInterceptorWithClassLoader
+ .beforeSendMessage(mock(Subscription.class), mock(Entry.class), null, null, null);
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test close
brokerInterceptorWithClassLoader.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 25953a71fbd..dd83fd2a4ce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -50,10 +50,12 @@ import org.eclipse.jetty.server.Response;
public class CounterBrokerInterceptor implements BrokerInterceptor {
private AtomicInteger beforeSendCount = new AtomicInteger();
+ private AtomicInteger beforeSendCountAtConsumerLevel = new AtomicInteger();
private AtomicInteger count = new AtomicInteger();
private AtomicInteger connectionCreationCount = new AtomicInteger();
private AtomicInteger producerCount = new AtomicInteger();
private AtomicInteger consumerCount = new AtomicInteger();
+ private AtomicInteger messagePublishCount = new AtomicInteger();
private AtomicInteger messageCount = new AtomicInteger();
private AtomicInteger messageDispatchCount = new AtomicInteger();
private AtomicInteger messageAckCount = new AtomicInteger();
@@ -104,6 +106,16 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
producerCount.incrementAndGet();
}
+ @Override
+ public void producerClosed(ServerCnx cnx, Producer producer,
+ Map<String, String> metadata) {
+ if (log.isDebugEnabled()) {
+ log.debug("Producer with name={}, id={} closed",
+ producer.getProducerName(), producer.getProducerId());
+ }
+ producerCount.decrementAndGet();
+ }
+
@Override
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
@@ -115,6 +127,26 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
consumerCount.incrementAndGet();
}
+ @Override
+ public void consumerClosed(ServerCnx cnx,
+ Consumer consumer,
+ Map<String, String> metadata) {
+ if (log.isDebugEnabled()) {
+ log.debug("Consumer with name={}, id={} closed",
+ consumer.consumerName(), consumer.consumerId());
+ }
+ consumerCount.decrementAndGet();
+ }
+
+ @Override
+ public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
+ if (log.isDebugEnabled()) {
+ log.debug("Message broker received topic={}, producer={}",
+ producer.getTopic().getName(), producer.getProducerName());
+ }
+ messagePublishCount.incrementAndGet();
+ }
+
@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId,
@@ -154,6 +186,19 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
beforeSendCount.incrementAndGet();
}
+ @Override
+ public void beforeSendMessage(Subscription subscription,
+ Entry entry,
+ long[] ackSet,
+ MessageMetadata msgMetadata,
+ Consumer consumer) {
+ if (log.isDebugEnabled()) {
+ log.debug("Send message to topic {}, subscription {}, consumer {}",
+ subscription.getTopic(), subscription.getName(), consumer.consumerName());
+ }
+ beforeSendCountAtConsumerLevel.incrementAndGet();
+ }
+
@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
if (log.isDebugEnabled()) {
@@ -238,6 +283,9 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
}
public int getMessagePublishCount() {
+ return messagePublishCount.get();
+ }
+ public int getMessageProducedCount() {
return messageCount.get();
}
@@ -253,6 +301,10 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
return beforeSendCount.get();
}
+ public int getBeforeSendCountAtConsumerLevel() {
+ return beforeSendCountAtConsumerLevel.get();
+ }
+
public int getConnectionCreationCount() {
return connectionCreationCount.get();
}