You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/07/11 11:58:28 UTC
flume git commit: FLUME-3237: Handling RuntimeExceptions coming from
the JMS provider in JMSSource
Repository: flume
Updated Branches:
refs/heads/trunk 719afe908 -> a76e2e9f2
FLUME-3237: Handling RuntimeExceptions coming from the JMS provider in JMSSource
Handling RuntimeExceptions in the same way as JMSExceptions in order to trigger
the reconnecting mechanism in JMSSource.
This closes #210
Reviewers: Endre Major, Ferenc Szabo
(Peter Turcsanyi via Ferenc Szabo)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a76e2e9f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a76e2e9f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a76e2e9f
Branch: refs/heads/trunk
Commit: a76e2e9f2cde18f3b7548b991535a3151425de12
Parents: 719afe9
Author: Ferenc Szabo <sz...@apache.org>
Authored: Wed Jul 11 13:56:02 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Wed Jul 11 13:56:02 2018 +0200
----------------------------------------------------------------------
.../flume/source/jms/JMSMessageConsumer.java | 30 ++++++++-
.../source/jms/TestJMSMessageConsumer.java | 64 ++++++++++++++++++++
2 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/a76e2e9f/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index 3b4da81..b0b1c08 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -129,12 +129,12 @@ class JMSMessageConsumer {
List<Event> take() throws JMSException {
List<Event> result = new ArrayList<Event>(batchSize);
Message message;
- message = messageConsumer.receive(pollTimeout);
+ message = receive();
if (message != null) {
result.addAll(messageConverter.convert(message));
int max = batchSize - 1;
for (int i = 0; i < max; i++) {
- message = messageConsumer.receiveNoWait();
+ message = receiveNoWait();
if (message == null) {
break;
}
@@ -147,11 +147,35 @@ class JMSMessageConsumer {
return result;
}
+ private Message receive() throws JMSException {
+ try {
+ return messageConsumer.receive(pollTimeout);
+ } catch (RuntimeException runtimeException) {
+ JMSException jmsException = new JMSException("JMS provider has thrown runtime exception: "
+ + runtimeException.getMessage());
+ jmsException.setLinkedException(runtimeException);
+ throw jmsException;
+ }
+ }
+
+ private Message receiveNoWait() throws JMSException {
+ try {
+ return messageConsumer.receiveNoWait();
+ } catch (RuntimeException runtimeException) {
+ JMSException jmsException = new JMSException("JMS provider has thrown runtime exception: "
+ + runtimeException.getMessage());
+ jmsException.setLinkedException(runtimeException);
+ throw jmsException;
+ }
+ }
+
void commit() {
try {
session.commit();
} catch (JMSException jmsException) {
logger.warn("JMS Exception processing commit", jmsException);
+ } catch (RuntimeException runtimeException) {
+ logger.warn("Runtime Exception processing commit", runtimeException);
}
}
@@ -160,6 +184,8 @@ class JMSMessageConsumer {
session.rollback();
} catch (JMSException jmsException) {
logger.warn("JMS Exception processing rollback", jmsException);
+ } catch (RuntimeException runtimeException) {
+ logger.warn("Runtime Exception processing rollback", runtimeException);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/a76e2e9f/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
index 711525e..41262af 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
@@ -175,4 +175,68 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase {
verify(session, times(1)).createDurableSubscriber(topic, name, messageSelector, true);
}
+ @Test(expected = JMSException.class)
+ public void testTakeFailsDueToJMSExceptionFromReceive() throws JMSException {
+ when(messageConsumer.receive(anyLong())).thenThrow(new JMSException(""));
+ consumer = create();
+
+ consumer.take();
+ }
+
+ @Test(expected = JMSException.class)
+ public void testTakeFailsDueToRuntimeExceptionFromReceive() throws JMSException {
+ when(messageConsumer.receive(anyLong())).thenThrow(new RuntimeException());
+ consumer = create();
+
+ consumer.take();
+ }
+
+ @Test(expected = JMSException.class)
+ public void testTakeFailsDueToJMSExceptionFromReceiveNoWait() throws JMSException {
+ when(messageConsumer.receiveNoWait()).thenThrow(new JMSException(""));
+ consumer = create();
+
+ consumer.take();
+ }
+
+ @Test(expected = JMSException.class)
+ public void testTakeFailsDueToRuntimeExceptionFromReceiveNoWait() throws JMSException {
+ when(messageConsumer.receiveNoWait()).thenThrow(new RuntimeException());
+ consumer = create();
+
+ consumer.take();
+ }
+
+ @Test
+ public void testCommitFailsDueToJMSException() throws JMSException {
+ doThrow(new JMSException("")).when(session).commit();
+ consumer = create();
+
+ consumer.commit();
+ }
+
+ @Test
+ public void testCommitFailsDueToRuntimeException() throws JMSException {
+ doThrow(new RuntimeException()).when(session).commit();
+ consumer = create();
+
+ consumer.commit();
+ }
+
+ @Test
+ public void testRollbackFailsDueToJMSException() throws JMSException {
+ doThrow(new JMSException("")).when(session).rollback();
+ consumer = create();
+
+ consumer.rollback();
+ }
+
+ @Test
+ public void testRollbackFailsDueToRuntimeException() throws JMSException {
+ doThrow(new RuntimeException()).when(session).rollback();
+ consumer = create();
+
+ consumer.rollback();
+ }
+
}