You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/06/21 14:16:04 UTC

flume git commit: FLUME-2579 Support durable subscription in JMSSource

Repository: flume
Updated Branches:
  refs/heads/trunk f2996cca5 -> 857df3fe9


FLUME-2579 Support durable subscription in JMSSource

JMSSource has created only nondurable subscriptions which could lead to event loss in case
of topic destination type.

This change enables durable subscription creation and lets user specify client id.
Also removed JMSMessageConsumerFactory which has no additional value.

This closes #120.

Reviewers: Attila Simon, Denes Arvay

(Andras Beni via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/857df3fe
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/857df3fe
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/857df3fe

Branch: refs/heads/trunk
Commit: 857df3fe9b211a0e34f17cc6d776a1296951134a
Parents: f2996cc
Author: Andras Beni <an...@cloudera.com>
Authored: Tue Mar 14 16:31:42 2017 +0100
Committer: Denes Arvay <de...@apache.org>
Committed: Wed Jun 21 16:15:18 2017 +0200

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  6 +++
 .../flume/source/jms/JMSMessageConsumer.java    | 18 +++++--
 .../source/jms/JMSMessageConsumerFactory.java   | 36 -------------
 .../org/apache/flume/source/jms/JMSSource.java  | 55 ++++++++++++++++----
 .../source/jms/JMSSourceConfiguration.java      |  8 +++
 .../source/jms/JMSMessageConsumerTestBase.java  |  2 +-
 .../source/jms/TestIntegrationActiveMQ.java     | 52 ++++++++++++++++++
 .../source/jms/TestJMSMessageConsumer.java      | 18 +++++++
 .../apache/flume/source/jms/TestJMSSource.java  | 19 ++-----
 9 files changed, 148 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index fde56ec..2073bf6 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -954,6 +954,12 @@ batchSize                   100          Number of messages to consume in one ba
 converter.type              DEFAULT      Class to use to convert messages to flume events. See below.
 converter.*                 --           Converter properties.
 converter.charset           UTF-8        Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
+createDurableSubscription   false        Whether to create durable subscription. Durable subscription can only be used with
+                                         destinationType topic. If true, "clientId" and "durableSubscriptionName"
+                                         have to be specified.
+clientId                    --           JMS client identifier set on Connection right after it is created.
+                                         Required for durable subscriptions.
+durableSubscriptionName     --           Name used to identify the durable subscription. Required for durable subscriptions.
 =========================   ===========  ==============================================================
 
 

http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index 6b3a1cf..3b4da81 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -32,6 +32,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
+import javax.jms.Topic;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 import java.util.ArrayList;
@@ -54,7 +55,9 @@ class JMSMessageConsumer {
                      String destinationName, JMSDestinationLocator destinationLocator,
                      JMSDestinationType destinationType, String messageSelector, int batchSize,
                      long pollTimeout, JMSMessageConverter messageConverter,
-                     Optional<String> userName, Optional<String> password) {
+                     Optional<String> userName, Optional<String> password,
+                     Optional<String> clientId, boolean createDurableSubscription,
+                     String durableSubscriptionName) {
     this.batchSize = batchSize;
     this.pollTimeout = pollTimeout;
     this.messageConverter = messageConverter;
@@ -69,6 +72,9 @@ class JMSMessageConsumer {
       } else {
         connection = connectionFactory.createConnection();
       }
+      if (clientId.isPresent()) {
+        connection.setClientID(clientId.get());
+      }
       connection.start();
     } catch (JMSException e) {
       throw new FlumeException("Could not create connection to broker", e);
@@ -102,8 +108,14 @@ class JMSMessageConsumer {
     }
 
     try {
-      messageConsumer = session.createConsumer(destination,
-          messageSelector.isEmpty() ? null : messageSelector);
+      if (createDurableSubscription) {
+        messageConsumer = session.createDurableSubscriber(
+            (Topic) destination, durableSubscriptionName,
+            messageSelector.isEmpty() ? null : messageSelector, true);
+      } else {
+        messageConsumer = session.createConsumer(destination,
+            messageSelector.isEmpty() ? null : messageSelector);
+      }
     } catch (JMSException e) {
       throw new FlumeException("Could not create consumer", e);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
deleted file mode 100644
index 9747a31..0000000
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.jms;
-
-import javax.jms.ConnectionFactory;
-import javax.naming.InitialContext;
-
-import com.google.common.base.Optional;
-
-public class JMSMessageConsumerFactory {
-
-  JMSMessageConsumer create(InitialContext initialContext, ConnectionFactory connectionFactory,
-      String destinationName, JMSDestinationType destinationType,
-      JMSDestinationLocator destinationLocator, String messageSelector, int batchSize,
-      long pollTimeout, JMSMessageConverter messageConverter,
-      Optional<String> userName, Optional<String> password) {
-    return new JMSMessageConsumer(initialContext, connectionFactory, destinationName,
-        destinationLocator, destinationType, messageSelector, batchSize, pollTimeout,
-        messageConverter, userName, password);
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index 7631827..72fc074 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -28,6 +28,7 @@ import javax.jms.JMSException;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -53,7 +54,6 @@ public class JMSSource extends AbstractPollableSource {
   private static final Logger logger = LoggerFactory.getLogger(JMSSource.class);
 
   // setup by constructor
-  private final JMSMessageConsumerFactory consumerFactory;
   private final InitialContextFactory initialContextFactory;
 
   // setup by configuration
@@ -72,19 +72,19 @@ public class JMSSource extends AbstractPollableSource {
   private SourceCounter sourceCounter;
   private int errorThreshold;
   private long pollTimeout;
+  private Optional<String> clientId;
+  private boolean createDurableSubscription;
+  private String durableSubscriptionName;
 
   private int jmsExceptionCounter;
   private InitialContext initialContext;
 
   public JMSSource() {
-    this(new JMSMessageConsumerFactory(), new InitialContextFactory());
+    this(new InitialContextFactory());
   }
 
-  @VisibleForTesting
-  public JMSSource(JMSMessageConsumerFactory consumerFactory,
-                   InitialContextFactory initialContextFactory) {
+  public JMSSource(InitialContextFactory initialContextFactory) {
     super();
-    this.consumerFactory = consumerFactory;
     this.initialContextFactory = initialContextFactory;
   }
 
@@ -121,6 +121,15 @@ public class JMSSource extends AbstractPollableSource {
     pollTimeout = context.getLong(JMSSourceConfiguration.POLL_TIMEOUT,
         JMSSourceConfiguration.POLL_TIMEOUT_DEFAULT);
 
+    clientId = Optional.fromNullable(context.getString(JMSSourceConfiguration.CLIENT_ID));
+
+    createDurableSubscription = context.getBoolean(
+        JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION, 
+        JMSSourceConfiguration.DEFAULT_CREATE_DURABLE_SUBSCRIPTION);
+    durableSubscriptionName = context.getString(
+        JMSSourceConfiguration.DURABLE_SUBSCRIPTION_NAME, 
+        JMSSourceConfiguration.DEFAULT_DURABLE_SUBSCRIPTION_NAME);
+
     String passwordFile = context.getString(JMSSourceConfiguration.PASSWORD_FILE, "").trim();
 
     if (passwordFile.isEmpty()) {
@@ -193,6 +202,30 @@ public class JMSSource extends AbstractPollableSource {
           "invalid.", destinationTypeName), e);
     }
 
+    if (createDurableSubscription) {
+      if (JMSDestinationType.TOPIC != destinationType) {
+        throw new FlumeException(String.format(
+            "Only Destination type '%s' supports durable subscriptions.",
+            JMSDestinationType.TOPIC.toString()));
+      }
+      if (!clientId.isPresent()) {
+        throw new FlumeException(String.format(
+            "You have to specify '%s' when using durable subscriptions.",
+            JMSSourceConfiguration.CLIENT_ID));
+      }
+      if (StringUtils.isEmpty(durableSubscriptionName)) {
+        throw new FlumeException(String.format("If '%s' is set to true, '%s' has to be specified.",
+            JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION,
+            JMSSourceConfiguration.DURABLE_SUBSCRIPTION_NAME));
+      }
+    } else if (!StringUtils.isEmpty(durableSubscriptionName)) {
+      logger.warn(String.format("'%s' is set, but '%s' is false."
+          + "If you want to create a durable subscription, set %s to true.",
+          JMSSourceConfiguration.DURABLE_SUBSCRIPTION_NAME,
+          JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION,
+          JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION));
+    }
+
     try {
       destinationLocator = JMSDestinationLocator.valueOf(destinationLocatorName);
     } catch (IllegalArgumentException e) {
@@ -312,11 +345,13 @@ public class JMSSource extends AbstractPollableSource {
     sourceCounter.stop();
   }
 
-  private JMSMessageConsumer createConsumer() throws JMSException {
+  @VisibleForTesting
+  JMSMessageConsumer createConsumer() throws JMSException {
     logger.info("Creating new consumer for " + destinationName);
-    JMSMessageConsumer consumer = consumerFactory.create(initialContext,
-        connectionFactory, destinationName, destinationType, destinationLocator,
-        messageSelector, batchSize, pollTimeout, converter, userName, password);
+    JMSMessageConsumer consumer = new JMSMessageConsumer(initialContext,
+        connectionFactory, destinationName, destinationLocator, destinationType,
+        messageSelector, batchSize, pollTimeout, converter, userName, password, clientId,
+        createDurableSubscription, durableSubscriptionName);
     jmsExceptionCounter = 0;
     return consumer;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
index 98bf8ab..adb167b 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
@@ -37,6 +37,8 @@ public class JMSSourceConfiguration {
 
   public static final String MESSAGE_SELECTOR = "messageSelector";
 
+  public static final String CLIENT_ID = "clientId";
+
   public static final String USERNAME = "userName";
 
   public static final String PASSWORD_FILE = "passwordFile";
@@ -58,4 +60,10 @@ public class JMSSourceConfiguration {
   public static final String CONVERTER_CHARSET = CONVERTER + ".charset";
   public static final String CONVERTER_CHARSET_DEFAULT = "UTF-8";
 
+  public static final String CREATE_DURABLE_SUBSCRIPTION = "createDurableSubscription";
+  public static final boolean DEFAULT_CREATE_DURABLE_SUBSCRIPTION = false;
+
+  public static final String DURABLE_SUBSCRIPTION_NAME = "durableSubscriptionName";
+  public static final String DEFAULT_DURABLE_SUBSCRIPTION_NAME = "";
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
index b8466f7..b3bce78 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
@@ -132,7 +132,7 @@ public abstract class JMSMessageConsumerTestBase {
   JMSMessageConsumer create() {
     return new JMSMessageConsumer(WONT_USE, connectionFactory, destinationName,
         destinationLocator, destinationType, messageSelector, batchSize,
-        pollTimeout, converter, userName, password);
+        pollTimeout, converter, userName, password, Optional.<String>absent(), false, "");
   }
   @After
   public void tearDown() throws Exception {

http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
index 53cc09a..5a35d73 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
@@ -235,4 +235,56 @@ public class TestIntegrationActiveMQ {
     Collections.sort(actual);
     Assert.assertEquals(expected, actual);
   }
+
+  @Test
+  public void testDurableSubscription() throws Exception {
+    context.put(JMSSourceConfiguration.DESTINATION_TYPE,
+        JMSSourceConfiguration.DESTINATION_TYPE_TOPIC);
+    context.put(JMSSourceConfiguration.CLIENT_ID, "FLUME");
+    context.put(JMSSourceConfiguration.DURABLE_SUBSCRIPTION_NAME, "SOURCE1");
+    context.put(JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION, "true");
+    context.put(JMSSourceConfiguration.BATCH_SIZE, "10");
+    source.configure(context);
+    source.start();
+    Thread.sleep(5000L);
+    List<String> expected = Lists.newArrayList();
+    List<String> input = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      input.add("before " + String.valueOf(i));
+    }
+    expected.addAll(input);
+    putTopic(input);
+
+    Thread.sleep(500L);
+    Assert.assertEquals(Status.READY, source.process());
+    Assert.assertEquals(Status.BACKOFF, source.process());
+    source.stop();
+    Thread.sleep(500L);
+    input = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      input.add("during " + String.valueOf(i));
+    }
+    expected.addAll(input);
+    putTopic(input);
+    source.start();
+    Thread.sleep(500L);
+    input = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      input.add("after " + String.valueOf(i));
+    }
+    expected.addAll(input);
+    putTopic(input);
+
+    Assert.assertEquals(Status.READY, source.process());
+    Assert.assertEquals(Status.READY, source.process());
+    Assert.assertEquals(Status.BACKOFF, source.process());
+    Assert.assertEquals(expected.size(), events.size());
+    List<String> actual = Lists.newArrayList();
+    for (Event event : events) {
+      actual.add(new String(event.getBody(), Charsets.UTF_8));
+    }
+    Collections.sort(expected);
+    Collections.sort(actual);
+    Assert.assertEquals(expected, actual);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
index dcb47d9..711525e 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
@@ -27,6 +27,8 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
 
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
@@ -157,4 +159,20 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase {
     verify(session, times(1)).close();
     verify(connection, times(1)).close();
   }
+
+  @Test
+  public void testCreateDurableSubscription() throws Exception {
+    String name = "SUBSCRIPTION_NAME";
+    String clientID = "CLIENT_ID";
+    TopicSubscriber mockTopicSubscriber = mock(TopicSubscriber.class);
+    when(session.createDurableSubscriber(any(Topic.class), anyString(), anyString(), anyBoolean()))
+      .thenReturn(mockTopicSubscriber );
+    when(session.createTopic(destinationName)).thenReturn(topic);
+    new JMSMessageConsumer(WONT_USE, connectionFactory, destinationName, destinationLocator,
+        JMSDestinationType.TOPIC, messageSelector, batchSize, pollTimeout, converter, userName,
+        password, Optional.of(clientID), true, name);
+    verify(connection, times(1)).setClientID(clientID);
+    verify(session, times(1)).createDurableSubscriber(topic, name, messageSelector, true);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/857df3fe/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
index cdc09b5..ed81b75 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 
-import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.naming.InitialContext;
@@ -46,7 +45,6 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
@@ -57,7 +55,6 @@ public class TestJMSSource extends JMSMessageConsumerTestBase {
   private InitialContext initialContext;
   private ChannelProcessor channelProcessor;
   private List<Event> events;
-  private JMSMessageConsumerFactory consumerFactory;
   private InitialContextFactory contextFactory;
   private File baseDir;
   private File passwordFile;
@@ -78,17 +75,12 @@ public class TestJMSSource extends JMSMessageConsumerTestBase {
         return null;
       }
     }).when(channelProcessor).processEventBatch(any(List.class));
-    consumerFactory = mock(JMSMessageConsumerFactory.class);
     consumer = spy(create());
-    when(consumerFactory.create(any(InitialContext.class), any(ConnectionFactory.class),
-                                anyString(), any(JMSDestinationType.class),
-                                any(JMSDestinationLocator.class), anyString(), anyInt(), anyLong(),
-                                any(JMSMessageConverter.class), any(Optional.class),
-                                any(Optional.class))).thenReturn(consumer);
     when(initialContext.lookup(anyString())).thenReturn(connectionFactory);
     contextFactory = mock(InitialContextFactory.class);
     when(contextFactory.create(any(Properties.class))).thenReturn(initialContext);
-    source = new JMSSource(consumerFactory, contextFactory);
+    source = spy(new JMSSource(contextFactory));
+    doReturn(consumer).when(source).createConsumer();
     source.setName("JMSSource-" + UUID.randomUUID());
     source.setChannelProcessor(channelProcessor);
     context = new Context();
@@ -143,14 +135,9 @@ public class TestJMSSource extends JMSMessageConsumerTestBase {
     source.configure(context);
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testStartConsumerCreateThrowsException() throws Exception {
-    when(consumerFactory.create(any(InitialContext.class), any(ConnectionFactory.class),
-                                anyString(), any(JMSDestinationType.class),
-                                any(JMSDestinationLocator.class), anyString(), anyInt(), anyLong(),
-                                any(JMSMessageConverter.class), any(Optional.class),
-                                any(Optional.class))).thenThrow(new RuntimeException());
+    doThrow(new RuntimeException("Expected")).when(source).createConsumer();
     source.configure(context);
     source.start();
     try {