You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/02/14 18:23:13 UTC
[activemq-artemis] branch master updated: ARTEMIS-2607 interceptor
returns false but processing continues
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new a8cf6b0 ARTEMIS-2607 interceptor returns false but processing continues
new 9a24b89 This closes #2974
a8cf6b0 is described below
commit a8cf6b04b4c54670f4b206cae917f4222e29201f
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jan 31 14:21:20 2020 -0600
ARTEMIS-2607 interceptor returns false but processing continues
---
.../amqp/broker/AMQPConnectionCallback.java | 8 +-
.../protocol/amqp/broker/AMQPSessionCallback.java | 69 ++++++++---------
.../amqp/broker/ProtonProtocolManager.java | 8 +-
.../amqp/proton/ProtonServerSenderContext.java | 4 +-
.../core/protocol/mqtt/MQTTProtocolHandler.java | 10 ++-
.../core/protocol/mqtt/MQTTProtocolManager.java | 8 +-
.../core/protocol/stomp/StompProtocolManager.java | 8 +-
.../artemis/core/server/ActiveMQServerLogger.java | 2 +-
.../spi/core/protocol/AbstractProtocolManager.java | 8 +-
.../amqp/AmqpSendReceiveInterceptorTest.java | 67 +++++++++++++++++
.../imported/MQTTRejectingInterceptorTest.java | 64 ++++++++++++++++
.../stomp/StompWithInterceptorsTest.java | 6 +-
.../stomp/StompWithRejectingInterceptorTest.java | 86 ++++++++++++++++++++++
13 files changed, 291 insertions(+), 57 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 1667945..a6cad89 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -294,11 +294,11 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
return null;
}
- public void invokeIncomingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
- manager.invokeIncoming(message, connection);
+ public String invokeIncomingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+ return manager.invokeIncoming(message, connection);
}
- public void invokeOutgoingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
- manager.invokeOutgoing(message, connection);
+ public String invokeOutgoingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+ return manager.invokeOutgoing(message, connection);
}
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index a65361d..dad5d54 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -507,36 +507,39 @@ public class AMQPSessionCallback implements SessionCallback {
final Receiver receiver,
final RoutingContext routingContext) throws Exception {
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
- invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
- serverSession.send(transaction, message, directDeliver, false, routingContext);
-
- afterIO(new IOCallback() {
- @Override
- public void done() {
- connection.runLater(() -> {
- if (delivery.getRemoteState() instanceof TransactionalState) {
- TransactionalState txAccepted = new TransactionalState();
- txAccepted.setOutcome(Accepted.getInstance());
- txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
-
- delivery.disposition(txAccepted);
- } else {
- delivery.disposition(Accepted.getInstance());
- }
- delivery.settle();
- context.flow();
- connection.flush();
- });
- }
+ if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) {
+ serverSession.send(transaction, message, directDeliver, false, routingContext);
+
+ afterIO(new IOCallback() {
+ @Override
+ public void done() {
+ connection.runLater(() -> {
+ if (delivery.getRemoteState() instanceof TransactionalState) {
+ TransactionalState txAccepted = new TransactionalState();
+ txAccepted.setOutcome(Accepted.getInstance());
+ txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
+
+ delivery.disposition(txAccepted);
+ } else {
+ delivery.disposition(Accepted.getInstance());
+ }
+ delivery.settle();
+ context.flow();
+ connection.flush();
+ });
+ }
- @Override
- public void onError(int errorCode, String errorMessage) {
- connection.runNow(() -> {
- receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
- connection.flush();
- });
- }
- });
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ connection.runNow(() -> {
+ receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
+ connection.flush();
+ });
+ }
+ });
+ } else {
+ rejectMessage(delivery, Symbol.valueOf("failed"), "Interceptor rejected message");
+ }
}
/** Will execute a Runnable on an Address when there's space in memory*/
@@ -692,12 +695,12 @@ public class AMQPSessionCallback implements SessionCallback {
manager.getServer().getSecurityStore().check(address, checkType, session);
}
- public void invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
- protonSPI.invokeIncomingInterceptors(message, connection);
+ public String invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+ return protonSPI.invokeIncomingInterceptors(message, connection);
}
- public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
- protonSPI.invokeOutgoingInterceptors(message, connection);
+ public String invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+ return protonSPI.invokeOutgoingInterceptors(message, connection);
}
public void addProducer(ServerProducer serverProducer) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 121020c..e0af481 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -294,12 +294,12 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
return prefixes;
}
- public void invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
- super.invokeInterceptors(this.incomingInterceptors, message, connection);
+ public String invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+ return super.invokeInterceptors(this.incomingInterceptors, message, connection);
}
- public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
- super.invokeInterceptors(this.outgoingInterceptors, message, connection);
+ public String invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+ return super.invokeInterceptors(this.outgoingInterceptors, message, connection);
}
public int getInitialRemoteMaxFrameSize() {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 3bb9b4a..3ab2699 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -787,7 +787,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
- sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection());
+ if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
+ return;
+ }
// Let the Message decide how to present the message bytes
ReadableBuffer sendBuffer = message.getSendBuffer(messageReference.getDeliveryCount());
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index e7388e8..8b0aae8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -100,7 +100,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MQTTUtil.logMessage(session.getState(), message, true);
- this.protocolManager.invokeIncoming(message, this.connection);
+ if (this.protocolManager.invokeIncoming(message, this.connection) != null) {
+ log.debugf("Interceptor rejected MQTT message: %s", message);
+ disconnect(true);
+ return;
+ }
switch (message.fixedHeader().messageType()) {
case CONNECT:
@@ -246,8 +250,10 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
private void sendToClient(MqttMessage message) {
+ if (this.protocolManager.invokeOutgoing(message, connection) != null) {
+ return;
+ }
MQTTUtil.logMessage(session.getSessionState(), message, false);
- this.protocolManager.invokeOutgoing(message, connection);
ctx.write(message);
ctx.flush();
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index d1777ea..d0f5d12 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -209,12 +209,12 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
return websocketRegistryNames;
}
- public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
- super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
+ public String invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
+ return super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
}
- public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
- super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
+ public String invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
+ return super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
}
public boolean isClientConnected(String clientId, MQTTConnection connection) {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index e9bad87..2fe6224 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -154,8 +154,10 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
try {
- invokeInterceptors(this.incomingInterceptors, request, conn);
conn.logFrame(request, true);
+ if (invokeInterceptors(this.incomingInterceptors, request, conn) != null) {
+ return;
+ }
conn.handleFrame(request);
} finally {
server.getStorageManager().clearContext();
@@ -187,7 +189,9 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
// Public --------------------------------------------------------
public boolean send(final StompConnection connection, final StompFrame frame) {
- invokeInterceptors(this.outgoingInterceptors, frame, connection);
+ if (invokeInterceptors(this.outgoingInterceptors, frame, connection) != null) {
+ return false;
+ }
connection.logFrame(frame, false);
synchronized (connection) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index fb92e2d..84de5ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1967,7 +1967,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224082, value = "Failed to invoke an interceptor", format = Message.Format.MESSAGE_FORMAT)
- void failedToInvokeAninterceptor(@Cause Exception e);
+ void failedToInvokeAnInterceptor(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224083, value = "Failed to close context", format = Message.Format.MESSAGE_FORMAT)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
index aaedecd..e27699b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
@@ -32,18 +32,20 @@ public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
- protected void invokeInterceptors(final List<I> interceptors, final P message, final C connection) {
+ protected String invokeInterceptors(final List<I> interceptors, final P message, final C connection) {
if (interceptors != null && !interceptors.isEmpty()) {
for (I interceptor : interceptors) {
try {
if (!interceptor.intercept(message, connection)) {
- break;
+ return interceptor.getClass().getName();
}
} catch (Exception e) {
- ActiveMQServerLogger.LOGGER.failedToInvokeAninterceptor(e);
+ ActiveMQServerLogger.LOGGER.failedToInvokeAnInterceptor(e);
}
}
}
+
+ return null;
}
@Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
index 8dcb2bf..0dfcda2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
@@ -80,6 +80,73 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport {
connection.close();
}
+ @Test(timeout = 60000)
+ public void testRejectMessageWithIncomingInterceptor() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() {
+ @Override
+ public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException {
+ latch.countDown();
+ return false;
+ }
+ });
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = new AmqpMessage();
+
+ message.setMessageId("msg" + 1);
+ message.setText("Test-Message");
+ try {
+ sender.send(message);
+ fail("Sending message should have thrown exception here.");
+ } catch (Exception e) {
+ assertEquals("Interceptor rejected message [condition = failed]", e.getMessage());
+ }
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(2);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNull(amqpMessage);
+ sender.close();
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testRejectMessageWithOutgoingInterceptor() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = new AmqpMessage();
+
+ message.setMessageId("msg" + 1);
+ message.setText("Test-Message");
+ sender.send(message);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() {
+ @Override
+ public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException {
+ latch.countDown();
+ return false;
+ }
+ });
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(2);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNull(amqpMessage);
+ assertEquals(latch.getCount(), 0);
+ sender.close();
+ receiver.close();
+ connection.close();
+ }
+
private static final String ADDRESS = "address";
private static final String MESSAGE_ID = "messageId";
private static final String CORRELATION_ID = "correlationId";
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
new file mode 100644
index 0000000..cda06c0
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+
+public class MQTTRejectingInterceptorTest extends MQTTTestSupport {
+
+ @Rule
+ public ErrorCollector collector = new ErrorCollector();
+
+ @Test(timeout = 60000)
+ public void testRejectedMQTTMessage() throws Exception {
+ final String addressQueue = name.getMethodName();
+ final String msgText = "Test rejected message";
+
+ final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
+ initializeConnection(subscribeProvider);
+ subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
+
+ MQTTInterceptor incomingInterceptor = new MQTTInterceptor() {
+ @Override
+ public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
+ System.out.println("incoming");
+ if (packet.getClass() == MqttPublishMessage.class) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ };
+
+ server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
+
+ final MQTTClientProvider publishProvider = getMQTTClientProvider();
+ initializeConnection(publishProvider);
+ publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, false);
+ assertNull(subscribeProvider.receive(3000));
+
+ subscribeProvider.disconnect();
+ publishProvider.disconnect();
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java
index fad0e12..23def66 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java
@@ -38,8 +38,8 @@ public class StompWithInterceptorsTest extends StompTestBase {
@Override
public List<String> getIncomingInterceptors() {
List<String> stompIncomingInterceptor = new ArrayList<>();
- stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$IncomingStompInterceptor");
- stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$CoreInterceptor");
+ stompIncomingInterceptor.add(IncomingStompInterceptor.class.getName());
+ stompIncomingInterceptor.add(CoreInterceptor.class.getName());
return stompIncomingInterceptor;
}
@@ -47,7 +47,7 @@ public class StompWithInterceptorsTest extends StompTestBase {
@Override
public List<String> getOutgoingInterceptors() {
List<String> stompOutgoingInterceptor = new ArrayList<>();
- stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$OutgoingStompInterceptor");
+ stompOutgoingInterceptor.add(OutgoingStompInterceptor.class.getName());
return stompOutgoingInterceptor;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java
new file mode 100644
index 0000000..736a56a
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
+import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StompWithRejectingInterceptorTest extends StompTestBase {
+
+ @Override
+ public List<String> getIncomingInterceptors() {
+ List<String> stompIncomingInterceptor = new ArrayList<>();
+ stompIncomingInterceptor.add(IncomingStompFrameRejectInterceptor.class.getName());
+
+ return stompIncomingInterceptor;
+ }
+
+ @Test
+ public void stompFrameInterceptor() throws Exception {
+ IncomingStompFrameRejectInterceptor.interceptedFrames.clear();
+
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
+ conn.connect(defUser, defPass);
+
+ ClientStompFrame frame = conn.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+ conn.sendFrame(frame);
+ conn.disconnect();
+
+ assertTrue(Wait.waitFor(() -> IncomingStompFrameRejectInterceptor.interceptedFrames.size() == 3, 2000, 50));
+
+ List<String> incomingCommands = new ArrayList<>(4);
+ incomingCommands.add("CONNECT");
+ incomingCommands.add("SEND");
+ incomingCommands.add("DISCONNECT");
+
+ for (int i = 0; i < IncomingStompFrameRejectInterceptor.interceptedFrames.size(); i++) {
+ Assert.assertEquals(incomingCommands.get(i), IncomingStompFrameRejectInterceptor.interceptedFrames.get(i).getCommand());
+ }
+
+ Wait.assertFalse(() -> server.locateQueue(SimpleString.toSimpleString(getQueuePrefix() + getQueueName())).getMessageCount() > 0, 1000, 100);
+ }
+
+ public static class IncomingStompFrameRejectInterceptor implements StompFrameInterceptor {
+
+ static List<StompFrame> interceptedFrames = Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
+ interceptedFrames.add(stompFrame);
+ if (stompFrame.getCommand() == Stomp.Commands.SEND) {
+ return false;
+ }
+ return true;
+ }
+ }
+}
\ No newline at end of file