You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/05/30 16:44:33 UTC

flume git commit: FLUME-2976 Exception when JMS source tries to connect to a Weblogic server without authentication

Repository: flume
Updated Branches:
  refs/heads/trunk 62c59a68d -> 30f6e39aa


FLUME-2976 Exception when JMS source tries to connect to a Weblogic server without authentication

changing the default "" value of the password to null

Reviewers: Bessenyei Balazs Donat, Peter Turcsanyi, Ferenc Szabo

(Denes Arvay via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/30f6e39a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/30f6e39a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/30f6e39a

Branch: refs/heads/trunk
Commit: 30f6e39aa66cbf2e3a7c3c77aa4ea68228b7e257
Parents: 62c59a6
Author: Ferenc Szabo <sz...@apache.org>
Authored: Wed May 30 18:42:30 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Wed May 30 18:42:30 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flume/source/jms/JMSSource.java  |  2 +-
 .../source/jms/TestIntegrationActiveMQ.java     | 77 +++++++++++++++-----
 2 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/30f6e39a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index 72fc074..e5ed969 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -133,7 +133,7 @@ public class JMSSource extends AbstractPollableSource {
     String passwordFile = context.getString(JMSSourceConfiguration.PASSWORD_FILE, "").trim();
 
     if (passwordFile.isEmpty()) {
-      password = Optional.of("");
+      password = Optional.absent();
     } else {
       try {
         password = Optional.of(Files.toString(new File(passwordFile),

http://git-wip-us.apache.org/repos/asf/flume/blob/30f6e39a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
index 5a35d73..e13502e 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
@@ -21,6 +21,8 @@ import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -47,34 +49,70 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 public class TestIntegrationActiveMQ {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestIntegrationActiveMQ.class);
+
   private static final String INITIAL_CONTEXT_FACTORY =
       "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
   public static final String BROKER_BIND_URL = "tcp://localhost:61516";
   private static final String DESTINATION_NAME = "test";
-  private static final String USERNAME = "user";
-  private static final String PASSWORD = "pass";
   // specific for dynamic queues on ActiveMq
   public static final String JNDI_PREFIX = "dynamicQueues/";
 
+  private enum TestMode {
+    WITH_AUTHENTICATION,
+    WITHOUT_AUTHENTICATION
+  }
+
   private File baseDir;
   private File tmpDir;
   private File dataDir;
-  private File passwordFile;
 
   private BrokerService broker;
   private Context context;
   private JMSSource source;
   private List<Event> events;
 
+  private final String jmsUserName;
+  private final String jmsPassword;
+
+  public TestIntegrationActiveMQ(TestMode testMode) {
+    LOGGER.info("Testing with test mode {}", testMode);
+
+    switch (testMode) {
+      case WITH_AUTHENTICATION:
+        jmsUserName = "user";
+        jmsPassword = "pass";
+        break;
+      case WITHOUT_AUTHENTICATION:
+        jmsUserName = null;
+        jmsPassword = null;
+        break;
+      default:
+        throw new IllegalArgumentException("Unhandled test mode: " + testMode);
+    }
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> parameters() {
+    return Arrays.asList(new Object[][]{
+      {TestMode.WITH_AUTHENTICATION},
+      {TestMode.WITHOUT_AUTHENTICATION}
+    });
+  }
 
   @SuppressWarnings("unchecked")
   @Before
@@ -83,26 +121,31 @@ public class TestIntegrationActiveMQ {
     tmpDir = new File(baseDir, "tmp");
     dataDir = new File(baseDir, "data");
     Assert.assertTrue(tmpDir.mkdir());
-    passwordFile = new File(baseDir, "password");
-    Files.write(PASSWORD.getBytes(Charsets.UTF_8), passwordFile);
 
     broker = new BrokerService();
-
     broker.addConnector(BROKER_BIND_URL);
     broker.setTmpDataDirectory(tmpDir);
     broker.setDataDirectoryFile(dataDir);
-    List<AuthenticationUser> users = Lists.newArrayList();
-    users.add(new AuthenticationUser(USERNAME, PASSWORD, ""));
-    SimpleAuthenticationPlugin authentication = new SimpleAuthenticationPlugin(users);
-    broker.setPlugins(new BrokerPlugin[]{authentication});
-    broker.start();
 
     context = new Context();
     context.put(JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
     context.put(JMSSourceConfiguration.PROVIDER_URL, BROKER_BIND_URL);
     context.put(JMSSourceConfiguration.DESTINATION_NAME, DESTINATION_NAME);
-    context.put(JMSSourceConfiguration.USERNAME, USERNAME);
-    context.put(JMSSourceConfiguration.PASSWORD_FILE, passwordFile.getAbsolutePath());
+
+    if (jmsUserName != null) {
+      File passwordFile = new File(baseDir, "password");
+      Files.write(jmsPassword.getBytes(Charsets.UTF_8), passwordFile);
+
+      AuthenticationUser jmsUser = new AuthenticationUser(jmsUserName, jmsPassword, "");
+      List<AuthenticationUser> users = Collections.singletonList(jmsUser);
+      SimpleAuthenticationPlugin authentication = new SimpleAuthenticationPlugin(users);
+      broker.setPlugins(new BrokerPlugin[]{authentication});
+
+      context.put(JMSSourceConfiguration.USERNAME, jmsUserName);
+      context.put(JMSSourceConfiguration.PASSWORD_FILE, passwordFile.getAbsolutePath());
+    }
+
+    broker.start();
 
     events = Lists.newArrayList();
     source = new JMSSource();
@@ -130,8 +173,8 @@ public class TestIntegrationActiveMQ {
   }
 
   private void putQueue(List<String> events) throws Exception {
-    ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,
-        PASSWORD, BROKER_BIND_URL);
+    ConnectionFactory factory = new ActiveMQConnectionFactory(jmsUserName, jmsPassword,
+        BROKER_BIND_URL);
     Connection connection = factory.createConnection();
     connection.start();
 
@@ -151,8 +194,8 @@ public class TestIntegrationActiveMQ {
   }
 
   private void putTopic(List<String> events) throws Exception {
-    ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,
-        PASSWORD, BROKER_BIND_URL);
+    ConnectionFactory factory = new ActiveMQConnectionFactory(jmsUserName, jmsPassword,
+        BROKER_BIND_URL);
     Connection connection = factory.createConnection();
     connection.start();