You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2023/02/16 14:52:31 UTC
[nifi] branch main updated: NIFI-11144 Fix failing tests for ConsumeJMS/PublishJMS
This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0d4f1523fe NIFI-11144 Fix failing tests for ConsumeJMS/PublishJMS
0d4f1523fe is described below
commit 0d4f1523fed712623a66dcf52ec42f08a71c7e88
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Mon Feb 6 19:07:02 2023 +0100
NIFI-11144 Fix failing tests for ConsumeJMS/PublishJMS
This closes #6930.
Signed-off-by: Tamas Palfy <tp...@apache.org>
---
.../nifi/util/StandardProcessorTestRunner.java | 3 +-
.../nifi/jms/processors/AbstractJMSProcessor.java | 6 +-
.../org/apache/nifi/jms/processors/ConsumeJMS.java | 3 +-
.../apache/nifi/jms/processors/JMSConsumer.java | 5 +-
.../org/apache/nifi/jms/processors/PublishJMS.java | 2 +-
.../apache/nifi/jms/processors/ConsumeJMSIT.java | 76 +++++++++++++++++-----
.../nifi/jms/processors/ConsumeJMSManualTest.java | 23 +++----
.../jms/processors/JMSPublisherConsumerIT.java | 30 ++++++---
.../apache/nifi/jms/processors/PublishJMSIT.java | 44 +++++++------
.../jms/processors/helpers/AssertionUtils.java | 66 +++++++++++++++++++
10 files changed, 191 insertions(+), 67 deletions(-)
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index b2b0557dad..63290be359 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -204,8 +204,7 @@ public class StandardProcessorTestRunner implements TestRunner {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context);
} catch (final Exception e) {
- e.printStackTrace();
- Assertions.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e);
+ Assertions.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e, e);
}
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index a1339dede1..aa8fa4dd64 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -185,14 +185,12 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
} catch (Exception e) {
getLogger().error("Failed to initialize JMS Connection Factory", e);
context.yield();
- return;
+ throw e;
}
}
try {
rendezvousWithJms(context, session, worker);
- } catch (Exception e) {
- getLogger().error("Error while trying to process JMS message", e);
} finally {
//in case of exception during worker's connection (consumer or publisher),
//an appropriate service is responsible to invalidate the worker.
@@ -209,7 +207,7 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
connectionFactoryProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
worker = buildTargetResource(context);
- }catch(Exception e) {
+ } catch (Exception e) {
getLogger().error("Failed to rebuild: " + connectionFactoryProvider);
worker = null;
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 7eee02c61a..6e99da3807 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -298,9 +298,10 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
}
});
} catch(Exception e) {
+ getLogger().error("Error while trying to process JMS message", e);
consumer.setValid(false);
context.yield();
- throw e; // for backward compatibility with exception handling in flows
+ throw e;
}
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index 20876b7b21..4d856ca157 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -46,7 +46,7 @@ import java.util.Map;
/**
* Generic consumer of messages from JMS compliant messaging system.
*/
-final class JMSConsumer extends JMSWorker {
+class JMSConsumer extends JMSWorker {
JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) {
super(connectionFactory, jmsTemplate, logger);
@@ -83,6 +83,9 @@ final class JMSConsumer extends JMSWorker {
}
+ /**
+ * Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
+ */
public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
final String charset, final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index fb82d5097d..2a236adfb1 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -222,7 +222,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
processSession.getProvenanceReporter().send(flowFile, destinationName);
} catch (Exception e) {
processSession.transfer(flowFile, REL_FAILURE);
- this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
+ getLogger().error("Failed while sending message to JMS via " + publisher, e);
context.yield();
publisher.setValid(false);
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index cf8a7614cf..e61d1a38cc 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -23,6 +23,7 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
@@ -30,6 +31,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
@@ -62,12 +64,15 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class ConsumeJMSIT {
@@ -196,9 +201,7 @@ public class ConsumeJMSIT {
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-
- sender.jmsTemplate.send("testMapMessage", __ -> createUnsupportedMessage("unsupportedMessagePropertyKey", "unsupportedMessagePropertyValue"));
+ jmsTemplate.send("testMapMessage", __ -> createUnsupportedMessage("unsupportedMessagePropertyKey", "unsupportedMessagePropertyValue"));
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
@@ -226,9 +229,7 @@ public class ConsumeJMSIT {
private void testMessageTypeAttribute(String destinationName, final MessageCreator messageCreator, String expectedJmsMessageTypeAttribute) throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
- JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-
- sender.jmsTemplate.send(destinationName, messageCreator);
+ jmsTemplate.send(destinationName, messageCreator);
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
@@ -324,7 +325,8 @@ public class ConsumeJMSIT {
TestRunner runner = createNonSharedDurableConsumer(cf, destinationName);
runner.setThreadCount(2);
final TestRunner temp = runner;
- assertThrows(Throwable.class, () -> temp.run(1, true));
+
+ assertCausedBy(ProcessException.class, "Durable non shared subscriptions cannot work on multiple threads.", () -> temp.run(1, true));
runner = createNonSharedDurableConsumer(cf, destinationName);
// using one thread, it should not fail.
@@ -334,7 +336,7 @@ public class ConsumeJMSIT {
/**
* <p>
- * This test validates the connection resources are closed if the publisher is marked as invalid.
+ * This test validates the connection resources are closed if the consumer is marked as invalid.
* </p>
* <p>
* This tests validates the proper resources handling for TCP connections using ActiveMQ (the bug was discovered against ActiveMQ 5.x). In this test, using some ActiveMQ's classes is possible to
@@ -356,7 +358,7 @@ public class ConsumeJMSIT {
BrokerService broker = new BrokerService();
try {
broker.setPersistent(false);
- broker.setBrokerName("nifi7034publisher");
+ broker.setBrokerName("nifi7034consumer");
TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0");
int port = connector.getServer().getSocketAddress().getPort();
broker.start();
@@ -384,7 +386,9 @@ public class ConsumeJMSIT {
runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
- assertThrows(AssertionError.class, () -> runner.run());
+ runner.run();
+ // since the worker is marked to invalid, we don't need to expect an exception here, because the worker recreation is handled automatically
+
assertFalse(tcpTransport.get().isConnected(), "It is expected transport be closed. ");
} finally {
if (broker != null) {
@@ -409,18 +413,21 @@ public class ConsumeJMSIT {
runner.setProperty(ConsumeJMS.DESTINATION, "foo");
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
- assertThrows(AssertionError.class, () -> runner.run());
- assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled(), "In case of an exception, the processor should be yielded.");
+ assertCausedBy(UnknownHostException.class, runner::run);
+
+ assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled(), "In case of an exception, the processor should be yielded.");
}
@Test
public void whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded() throws Exception {
+ final String nonExistentClassName = "DummyJMSConnectionFactoryClass";
+
TestRunner runner = TestRunners.newTestRunner(ConsumeJMS.class);
// using (non-JNDI) JMS Connection Factory via controller service
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
- runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, "DummyJMSConnectionFactoryClass");
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, nonExistentClassName);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, "DummyBrokerUri");
runner.enableControllerService(cfProvider);
@@ -428,10 +435,49 @@ public class ConsumeJMSIT {
runner.setProperty(ConsumeJMS.DESTINATION, "myTopic");
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
- assertThrows(AssertionError.class, () -> runner.run());
+ assertCausedBy(ClassNotFoundException.class, nonExistentClassName, runner::run);
+
assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled(), "In case of an exception, the processor should be yielded.");
}
+ @Test
+ @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ public void whenExceptionIsRaisedInAcceptTheProcessorShouldYieldAndRollback() throws Exception {
+ final String destination = "testQueue";
+ final RuntimeException expectedException = new RuntimeException();
+
+ final ConsumeJMS processor = new ConsumeJMS() {
+ @Override
+ protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSConsumer consumer) throws ProcessException {
+ ProcessSession spiedSession = spy(processSession);
+ doThrow(expectedException).when(spiedSession).write(any(FlowFile.class), any(OutputStreamCallback.class));
+ super.rendezvousWithJms(context, spiedSession, consumer);
+ }
+ };
+
+ JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
+ try {
+ jmsTemplate.send(destination, session -> session.createTextMessage("msg"));
+
+ TestRunner runner = TestRunners.newTestRunner(processor);
+ JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(ConsumeJMS.DESTINATION, destination);
+ runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
+
+ assertCausedBy(expectedException, () -> runner.run(1, false));
+
+ assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled(), "In case of an exception, the processor should be yielded.");
+ } finally {
+ ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
+ }
+ }
+
private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException {
// Publish a message.
try (Connection conn = cf.createConnection();
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSManualTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSManualTest.java
index 9680b45ad2..f26fd3dc58 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSManualTest.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSManualTest.java
@@ -18,7 +18,6 @@ package org.apache.nifi.jms.processors;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.nifi.logging.ComponentLog;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.jms.connection.CachingConnectionFactory;
@@ -33,12 +32,10 @@ import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-import static org.mockito.Mockito.mock;
-
@Disabled("Used for manual testing.")
public class ConsumeJMSManualTest {
@Test
- public void testTextMessage() throws Exception {
+ public void testTextMessage() {
MessageCreator messageCreator = session -> {
TextMessage message = session.createTextMessage("textMessageContent");
@@ -49,7 +46,7 @@ public class ConsumeJMSManualTest {
}
@Test
- public void testBytesMessage() throws Exception {
+ public void testBytesMessage() {
MessageCreator messageCreator = session -> {
BytesMessage message = session.createBytesMessage();
@@ -62,7 +59,7 @@ public class ConsumeJMSManualTest {
}
@Test
- public void testObjectMessage() throws Exception {
+ public void testObjectMessage() {
MessageCreator messageCreator = session -> {
ObjectMessage message = session.createObjectMessage();
@@ -75,7 +72,7 @@ public class ConsumeJMSManualTest {
}
@Test
- public void testStreamMessage() throws Exception {
+ public void testStreamMessage() {
MessageCreator messageCreator = session -> {
StreamMessage message = session.createStreamMessage();
@@ -98,7 +95,7 @@ public class ConsumeJMSManualTest {
}
@Test
- public void testMapMessage() throws Exception {
+ public void testMapMessage() {
MessageCreator messageCreator = session -> {
MapMessage message = session.createMapMessage();
@@ -121,14 +118,14 @@ public class ConsumeJMSManualTest {
}
@Test
- public void testUnsupportedMessage() throws Exception {
+ public void testUnsupportedMessage() {
MessageCreator messageCreator = session -> new ActiveMQMessage();
send(messageCreator);
}
- private void send(MessageCreator messageCreator) throws Exception {
- final String destinationName = "TEST";
+ private void send(MessageCreator messageCreator) {
+ final String destinationName = "TEST";
ConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
final ConnectionFactory connectionFactory = new CachingConnectionFactory(activeMqConnectionFactory);
@@ -139,9 +136,7 @@ public class ConsumeJMSManualTest {
jmsTemplate.setReceiveTimeout(10L);
try {
- JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-
- sender.jmsTemplate.send(destinationName, messageCreator);
+ jmsTemplate.send(destinationName, messageCreator);
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index 40f3a47955..b681bc09e8 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -58,7 +58,7 @@ import org.springframework.jms.support.JmsHeaders;
public class JMSPublisherConsumerIT {
@Test
- public void testObjectMessage() throws Exception {
+ public void testObjectMessage() {
final String destinationName = "testObjectMessage";
MessageCreator messageCreator = session -> {
@@ -136,7 +136,7 @@ public class JMSPublisherConsumerIT {
}
@Test
- public void testMapMessage() throws Exception {
+ public void testMapMessage() {
final String destinationName = "testObjectMessage";
MessageCreator messageCreator = session -> {
@@ -269,7 +269,7 @@ public class JMSPublisherConsumerIT {
* at which point this test will no be longer required.
*/
@Test
- public void validateFailOnUnsupportedMessageType() throws Exception {
+ public void validateFailOnUnsupportedMessageType() {
final String destinationName = "validateFailOnUnsupportedMessageType";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
@@ -332,13 +332,17 @@ public class JMSPublisherConsumerIT {
@Test
@Timeout(value = 20000, unit = TimeUnit.MILLISECONDS)
public void testMultipleThreads() throws Exception {
+ final int threadCount = 4;
+ final int totalMessageCount = 1000;
+ final int messagesPerThreadCount = totalMessageCount / threadCount;
+
String destinationName = "testMultipleThreads";
JmsTemplate publishTemplate = CommonTest.buildJmsTemplateForDestination(false);
- final CountDownLatch consumerTemplateCloseCount = new CountDownLatch(4);
+ final CountDownLatch consumerTemplateCloseCount = new CountDownLatch(threadCount);
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), publishTemplate, mock(ComponentLog.class));
- for (int i = 0; i < 4000; i++) {
+ for (int i = 0; i < totalMessageCount; i++) {
publisher.publish(destinationName, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
@@ -359,7 +363,7 @@ public class JMSPublisherConsumerIT {
try {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
- for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
+ for (int j = 0; j < messagesPerThreadCount && msgCount.get() < totalMessageCount; j++) {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", callback);
}
} finally {
@@ -373,7 +377,7 @@ public class JMSPublisherConsumerIT {
}
int iterations = 0;
- while (msgCount.get() < 4000) {
+ while (msgCount.get() < totalMessageCount) {
Thread.sleep(10L);
if (++iterations % 100 == 0) {
System.out.println(msgCount.get() + " messages received so far");
@@ -389,7 +393,7 @@ public class JMSPublisherConsumerIT {
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
- public void validateMessageRedeliveryWhenNotAcked() throws Exception {
+ public void validateMessageRedeliveryWhenNotAcked() {
String destinationName = "validateMessageRedeliveryWhenNotAcked";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
@@ -426,6 +430,7 @@ public class JMSPublisherConsumerIT {
callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody()));
+ acknowledge(response);
}
});
}
@@ -467,6 +472,7 @@ public class JMSPublisherConsumerIT {
callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody()));
+ acknowledge(response);
}
});
}
@@ -478,6 +484,14 @@ public class JMSPublisherConsumerIT {
}
}
+ private void acknowledge(JMSResponse response) {
+ try {
+ response.acknowledge();
+ } catch (JMSException e) {
+ throw new IllegalStateException("Unable to acknowledge JMS message");
+ }
+ }
+
@Test
public void testMessageSelector() {
String destinationName = "testMessageSelector";
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index 8e72c9fd98..802f4956b2 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -16,15 +16,6 @@
*/
package org.apache.nifi.jms.processors;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@@ -41,12 +32,17 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.TextMessage;
+import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.net.URI;
@@ -56,12 +52,14 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.TextMessage;
-import javax.net.SocketFactory;
+import static org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class PublishJMSIT {
@@ -525,19 +523,23 @@ public class PublishJMSIT {
}
@Test
- public void whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded() throws Exception {
+ public void whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded() {
+ final String nonExistentClassName = "DummyInitialContextFactoryClass";
+
TestRunner runner = TestRunners.newTestRunner(PublishJMS.class);
// using JNDI JMS Connection Factory configured locally on the processor
- runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY, "DummyInitialContextFactoryClass");
+ runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY, nonExistentClassName);
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL, "DummyProviderUrl");
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME, "DummyConnectionFactoryName");
- runner.setProperty(ConsumeJMS.DESTINATION, "myTopic");
- runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+ runner.setProperty(AbstractJMSProcessor.DESTINATION, "myTopic");
+ runner.setProperty(AbstractJMSProcessor.DESTINATION_TYPE, AbstractJMSProcessor.TOPIC);
runner.enqueue("message");
- assertThrows(AssertionError.class, () -> runner.run());
+
+ assertCausedBy(ClassNotFoundException.class, nonExistentClassName, runner::run);
+
assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled(), "In case of an exception, the processor should be yielded.");
}
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/AssertionUtils.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/AssertionUtils.java
new file mode 100644
index 0000000000..19aaaf5824
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/AssertionUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class AssertionUtils {
+
+ public static <T extends Throwable> void assertCausedBy(Class<T> expectedType, Runnable runnable) {
+ assertCausedBy(expectedType, null, runnable);
+ }
+
+ public static <T extends Throwable> void assertCausedBy(Class<T> expectedType, String expectedMessage, Runnable runnable) {
+ try {
+ runnable.run();
+ fail(String.format("Expected an exception to be thrown with a cause of %s, but nothing was thrown.", expectedType.getCanonicalName()));
+ } catch (Throwable throwable) {
+ final List<Throwable> causes = ExceptionUtils.getThrowableList(throwable);
+ for (Throwable cause : causes) {
+ if (expectedType.isInstance(cause)) {
+ if (expectedMessage != null) {
+ if (cause.getMessage() != null && cause.getMessage().startsWith(expectedMessage)) {
+ return;
+ }
+ } else {
+ return;
+ }
+ }
+ }
+ fail(String.format("Exception is thrown but not found %s as a cause. Received exception is: %s", expectedType.getCanonicalName(), throwable), throwable);
+ }
+ }
+
+ public static void assertCausedBy(Throwable expectedException, Runnable runnable) {
+ try {
+ runnable.run();
+ fail(String.format("Expected an exception to be thrown with a cause of %s, but nothing was thrown.", expectedException));
+ } catch (Throwable throwable) {
+ final List<Throwable> causes = ExceptionUtils.getThrowableList(throwable);
+ for (Throwable cause : causes) {
+ if (cause.equals(expectedException)) {
+ return;
+ }
+ }
+ fail(String.format("Exception is thrown but not found %s as a cause. Received exception is: %s", expectedException, throwable), throwable);
+ }
+ }
+}