You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/07/11 11:58:28 UTC

flume git commit: FLUME-3237: Handling RuntimeExceptions coming from the JMS provider in JMSSource

Repository: flume
Updated Branches:
  refs/heads/trunk 719afe908 -> a76e2e9f2


FLUME-3237: Handling RuntimeExceptions coming from the JMS provider in JMSSource

Handling RuntimeExceptions in the same way as JMSExceptions in order to trigger
the reconnecting mechanism in JMSSource.

This closes #210

Reviewers: Endre Major, Ferenc Szabo

(Peter Turcsanyi via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a76e2e9f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a76e2e9f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a76e2e9f

Branch: refs/heads/trunk
Commit: a76e2e9f2cde18f3b7548b991535a3151425de12
Parents: 719afe9
Author: Ferenc Szabo <sz...@apache.org>
Authored: Wed Jul 11 13:56:02 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Wed Jul 11 13:56:02 2018 +0200

----------------------------------------------------------------------
 .../flume/source/jms/JMSMessageConsumer.java    | 30 ++++++++-
 .../source/jms/TestJMSMessageConsumer.java      | 64 ++++++++++++++++++++
 2 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a76e2e9f/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index 3b4da81..b0b1c08 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -129,12 +129,12 @@ class JMSMessageConsumer {
   List<Event> take() throws JMSException {
     List<Event> result = new ArrayList<Event>(batchSize);
     Message message;
-    message = messageConsumer.receive(pollTimeout);
+    message = receive();
     if (message != null) {
       result.addAll(messageConverter.convert(message));
       int max = batchSize - 1;
       for (int i = 0; i < max; i++) {
-        message = messageConsumer.receiveNoWait();
+        message = receiveNoWait();
         if (message == null) {
           break;
         }
@@ -147,11 +147,35 @@ class JMSMessageConsumer {
     return result;
   }
 
+  private Message receive() throws JMSException {
+    try {
+      return messageConsumer.receive(pollTimeout);
+    } catch (RuntimeException runtimeException) {
+      JMSException jmsException = new JMSException("JMS provider has thrown runtime exception: "
+              + runtimeException.getMessage());
+      jmsException.setLinkedException(runtimeException);
+      throw jmsException;
+    }
+  }
+
+  private Message receiveNoWait() throws JMSException {
+    try {
+      return messageConsumer.receiveNoWait();
+    } catch (RuntimeException runtimeException) {
+      JMSException jmsException = new JMSException("JMS provider has thrown runtime exception: "
+              + runtimeException.getMessage());
+      jmsException.setLinkedException(runtimeException);
+      throw jmsException;
+    }
+  }
+
   void commit() {
     try {
       session.commit();
     } catch (JMSException jmsException) {
       logger.warn("JMS Exception processing commit", jmsException);
+    } catch (RuntimeException runtimeException) {
+      logger.warn("Runtime Exception processing commit", runtimeException);
     }
   }
 
@@ -160,6 +184,8 @@ class JMSMessageConsumer {
       session.rollback();
     } catch (JMSException jmsException) {
       logger.warn("JMS Exception processing rollback", jmsException);
+    } catch (RuntimeException runtimeException) {
+      logger.warn("Runtime Exception processing rollback", runtimeException);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/a76e2e9f/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
index 711525e..41262af 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
@@ -175,4 +175,68 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase {
     verify(session, times(1)).createDurableSubscriber(topic, name, messageSelector, true);
   }
 
+  @Test(expected = JMSException.class)
+  public void testTakeFailsDueToJMSExceptionFromReceive() throws JMSException {
+    when(messageConsumer.receive(anyLong())).thenThrow(new JMSException(""));
+    consumer = create();
+
+    consumer.take();
+  }
+
+  @Test(expected = JMSException.class)
+  public void testTakeFailsDueToRuntimeExceptionFromReceive() throws JMSException {
+    when(messageConsumer.receive(anyLong())).thenThrow(new RuntimeException());
+    consumer = create();
+
+    consumer.take();
+  }
+
+  @Test(expected = JMSException.class)
+  public void testTakeFailsDueToJMSExceptionFromReceiveNoWait() throws JMSException {
+    when(messageConsumer.receiveNoWait()).thenThrow(new JMSException(""));
+    consumer = create();
+
+    consumer.take();
+  }
+
+  @Test(expected = JMSException.class)
+  public void testTakeFailsDueToRuntimeExceptionFromReceiveNoWait() throws JMSException {
+    when(messageConsumer.receiveNoWait()).thenThrow(new RuntimeException());
+    consumer = create();
+
+    consumer.take();
+  }
+
+  @Test
+  public void testCommitFailsDueToJMSException() throws JMSException {
+    doThrow(new JMSException("")).when(session).commit();
+    consumer = create();
+
+    consumer.commit();
+  }
+
+  @Test
+  public void testCommitFailsDueToRuntimeException() throws JMSException {
+    doThrow(new RuntimeException()).when(session).commit();
+    consumer = create();
+
+    consumer.commit();
+  }
+
+  @Test
+  public void testRollbackFailsDueToJMSException() throws JMSException {
+    doThrow(new JMSException("")).when(session).rollback();
+    consumer = create();
+
+    consumer.rollback();
+  }
+
+  @Test
+  public void testRollbackFailsDueToRuntimeException() throws JMSException {
+    doThrow(new RuntimeException()).when(session).rollback();
+    consumer = create();
+
+    consumer.rollback();
+  }
+
 }