You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/05/30 16:44:33 UTC
flume git commit: FLUME-2976 Exception when JMS source tries to
connect to a Weblogic server without authentication
Repository: flume
Updated Branches:
refs/heads/trunk 62c59a68d -> 30f6e39aa
FLUME-2976 Exception when JMS source tries to connect to a Weblogic server without authentication
changing the default "" value of the password to null
Reviewers: Bessenyei Balazs Donat, Peter Turcsanyi, Ferenc Szabo
(Denes Arvay via Ferenc Szabo)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/30f6e39a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/30f6e39a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/30f6e39a
Branch: refs/heads/trunk
Commit: 30f6e39aa66cbf2e3a7c3c77aa4ea68228b7e257
Parents: 62c59a6
Author: Ferenc Szabo <sz...@apache.org>
Authored: Wed May 30 18:42:30 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Wed May 30 18:42:30 2018 +0200
----------------------------------------------------------------------
.../org/apache/flume/source/jms/JMSSource.java | 2 +-
.../source/jms/TestIntegrationActiveMQ.java | 77 +++++++++++++++-----
2 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/30f6e39a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index 72fc074..e5ed969 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -133,7 +133,7 @@ public class JMSSource extends AbstractPollableSource {
String passwordFile = context.getString(JMSSourceConfiguration.PASSWORD_FILE, "").trim();
if (passwordFile.isEmpty()) {
- password = Optional.of("");
+ password = Optional.absent();
} else {
try {
password = Optional.of(Files.toString(new File(passwordFile),
http://git-wip-us.apache.org/repos/asf/flume/blob/30f6e39a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
index 5a35d73..e13502e 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
@@ -21,6 +21,8 @@ import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -47,34 +49,70 @@ import org.apache.flume.channel.ChannelProcessor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+@RunWith(Parameterized.class)
public class TestIntegrationActiveMQ {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestIntegrationActiveMQ.class);
+
private static final String INITIAL_CONTEXT_FACTORY =
"org.apache.activemq.jndi.ActiveMQInitialContextFactory";
public static final String BROKER_BIND_URL = "tcp://localhost:61516";
private static final String DESTINATION_NAME = "test";
- private static final String USERNAME = "user";
- private static final String PASSWORD = "pass";
// specific for dynamic queues on ActiveMq
public static final String JNDI_PREFIX = "dynamicQueues/";
+ private enum TestMode {
+ WITH_AUTHENTICATION,
+ WITHOUT_AUTHENTICATION
+ }
+
private File baseDir;
private File tmpDir;
private File dataDir;
- private File passwordFile;
private BrokerService broker;
private Context context;
private JMSSource source;
private List<Event> events;
+ private final String jmsUserName;
+ private final String jmsPassword;
+
+ public TestIntegrationActiveMQ(TestMode testMode) {
+ LOGGER.info("Testing with test mode {}", testMode);
+
+ switch (testMode) {
+ case WITH_AUTHENTICATION:
+ jmsUserName = "user";
+ jmsPassword = "pass";
+ break;
+ case WITHOUT_AUTHENTICATION:
+ jmsUserName = null;
+ jmsPassword = null;
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled test mode: " + testMode);
+ }
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][]{
+ {TestMode.WITH_AUTHENTICATION},
+ {TestMode.WITHOUT_AUTHENTICATION}
+ });
+ }
@SuppressWarnings("unchecked")
@Before
@@ -83,26 +121,31 @@ public class TestIntegrationActiveMQ {
tmpDir = new File(baseDir, "tmp");
dataDir = new File(baseDir, "data");
Assert.assertTrue(tmpDir.mkdir());
- passwordFile = new File(baseDir, "password");
- Files.write(PASSWORD.getBytes(Charsets.UTF_8), passwordFile);
broker = new BrokerService();
-
broker.addConnector(BROKER_BIND_URL);
broker.setTmpDataDirectory(tmpDir);
broker.setDataDirectoryFile(dataDir);
- List<AuthenticationUser> users = Lists.newArrayList();
- users.add(new AuthenticationUser(USERNAME, PASSWORD, ""));
- SimpleAuthenticationPlugin authentication = new SimpleAuthenticationPlugin(users);
- broker.setPlugins(new BrokerPlugin[]{authentication});
- broker.start();
context = new Context();
context.put(JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
context.put(JMSSourceConfiguration.PROVIDER_URL, BROKER_BIND_URL);
context.put(JMSSourceConfiguration.DESTINATION_NAME, DESTINATION_NAME);
- context.put(JMSSourceConfiguration.USERNAME, USERNAME);
- context.put(JMSSourceConfiguration.PASSWORD_FILE, passwordFile.getAbsolutePath());
+
+ if (jmsUserName != null) {
+ File passwordFile = new File(baseDir, "password");
+ Files.write(jmsPassword.getBytes(Charsets.UTF_8), passwordFile);
+
+ AuthenticationUser jmsUser = new AuthenticationUser(jmsUserName, jmsPassword, "");
+ List<AuthenticationUser> users = Collections.singletonList(jmsUser);
+ SimpleAuthenticationPlugin authentication = new SimpleAuthenticationPlugin(users);
+ broker.setPlugins(new BrokerPlugin[]{authentication});
+
+ context.put(JMSSourceConfiguration.USERNAME, jmsUserName);
+ context.put(JMSSourceConfiguration.PASSWORD_FILE, passwordFile.getAbsolutePath());
+ }
+
+ broker.start();
events = Lists.newArrayList();
source = new JMSSource();
@@ -130,8 +173,8 @@ public class TestIntegrationActiveMQ {
}
private void putQueue(List<String> events) throws Exception {
- ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,
- PASSWORD, BROKER_BIND_URL);
+ ConnectionFactory factory = new ActiveMQConnectionFactory(jmsUserName, jmsPassword,
+ BROKER_BIND_URL);
Connection connection = factory.createConnection();
connection.start();
@@ -151,8 +194,8 @@ public class TestIntegrationActiveMQ {
}
private void putTopic(List<String> events) throws Exception {
- ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,
- PASSWORD, BROKER_BIND_URL);
+ ConnectionFactory factory = new ActiveMQConnectionFactory(jmsUserName, jmsPassword,
+ BROKER_BIND_URL);
Connection connection = factory.createConnection();
connection.start();