You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/10/08 04:00:17 UTC
[flume] 04/14: FLUME-3437 - Improve JMSSource validation
This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
commit eee179a09df405c1ab55ae25a53b76ca1050bb97
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Fri Sep 23 00:10:42 2022 -0700
FLUME-3437 - Improve JMSSource validation
---
.../flume/source/jms/JMSMessageConsumer.java | 16 +----
.../flume/source/jms/JMSMessageConverter.java | 6 +-
.../org/apache/flume/source/jms/JMSSource.java | 68 +++++++++++++---------
.../flume/source/jms/TestIntegrationActiveMQ.java | 1 +
.../org/apache/flume/source/jms/TestJMSSource.java | 1 +
5 files changed, 46 insertions(+), 46 deletions(-)
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index 5375bd030..84409be67 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -35,14 +35,11 @@ import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
class JMSMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(JMSMessageConsumer.class);
- private static final String JAVA_SCHEME = "java";
private final int batchSize;
private final long pollTimeout;
@@ -102,14 +99,7 @@ class JMSMessageConsumer {
throw new IllegalStateException(String.valueOf(destinationType));
}
} else {
- try {
- URI uri = new URI(destinationName);
- String scheme = uri.getScheme();
- assertTrue(scheme == null || scheme.equals(JAVA_SCHEME),
- "Unsupported JNDI URI: " + destinationName);
- } catch (URISyntaxException ex) {
- logger.warn("Invalid JNDI URI - {}", destinationName);
- }
+ JMSSource.verifyContext(destinationName);
destination = (Destination) initialContext.lookup(destinationName);
}
} catch (JMSException e) {
@@ -220,8 +210,4 @@ class JMSMessageConsumer {
logger.error("Could not destroy connection", e);
}
}
-
- private void assertTrue(boolean arg, String msg) {
- Preconditions.checkArgument(arg, msg);
- }
}
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java
index 7d02809b4..31a15b874 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java
@@ -39,13 +39,13 @@ import org.apache.flume.annotations.InterfaceStability;
@InterfaceStability.Stable
public interface JMSMessageConverter {
- public List<Event> convert(Message message) throws JMSException;
+ List<Event> convert(Message message) throws JMSException;
/**
* Implementors of JMSMessageConverter must either provide
* a suitable builder or implement the Configurable interface.
*/
- public interface Builder {
- public JMSMessageConverter build(Context context);
+ interface Builder {
+ JMSMessageConverter build(Context context);
}
}
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index 216225871..43df14518 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -21,6 +21,8 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
@@ -56,6 +58,7 @@ import com.google.common.io.Files;
public class JMSSource extends AbstractPollableSource implements BatchSizeSupported {
private static final Logger logger = LoggerFactory.getLogger(JMSSource.class);
private static final String JAVA_SCHEME = "java";
+ public static final String JNDI_ALLOWED_PROTOCOLS = "JndiAllowedProtocols";
// setup by constructor
private final InitialContextFactory initialContextFactory;
@@ -82,6 +85,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
private int jmsExceptionCounter;
private InitialContext initialContext;
+ private static List<String> allowedSchemes = getAllowedProtocols();
public JMSSource() {
this(new InitialContextFactory());
@@ -92,6 +96,34 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
this.initialContextFactory = initialContextFactory;
}
+ private static List<String> getAllowedProtocols() {
+ String allowed = System.getProperty(JNDI_ALLOWED_PROTOCOLS, null);
+ if (allowed == null) {
+ return Collections.singletonList(JAVA_SCHEME);
+ } else {
+ String[] items = allowed.split(",");
+ List<String> schemes = new ArrayList<>();
+ schemes.add(JAVA_SCHEME);
+ for (String item : items) {
+ if (!item.equals(JAVA_SCHEME)) {
+ schemes.add(item.trim());
+ }
+ }
+ return schemes;
+ }
+ }
+
+ public static void verifyContext(String location) {
+ try {
+ String scheme = new URI(location).getScheme();
+ if (scheme != null && !allowedSchemes.contains(scheme)) {
+ throw new IllegalArgumentException("Invalid JNDI URI: " + location);
+ }
+ } catch (URISyntaxException ex) {
+ logger.trace("{}} is not a valid URI", location);
+ }
+ }
+
@Override
protected void doConfigure(Context context) throws FlumeException {
sourceCounter = new SourceCounter(getName());
@@ -100,14 +132,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, "").trim();
providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "").trim();
- try {
- URI uri = new URI(providerUrl);
- String scheme = uri.getScheme();
- assertTrue(scheme == null || scheme.equals(JAVA_SCHEME),
- "Unsupported JNDI URI: " + providerUrl);
- } catch (URISyntaxException ex) {
- logger.warn("Invalid JNDI URI - {}", providerUrl);
- }
+ verifyContext(providerUrl);
destinationName = context.getString(JMSSourceConfiguration.DESTINATION_NAME, "").trim();
@@ -190,14 +215,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
String connectionFactoryName = context.getString(
JMSSourceConfiguration.CONNECTION_FACTORY,
JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT).trim();
- try {
- URI uri = new URI(connectionFactoryName);
- String scheme = uri.getScheme();
- assertTrue(scheme == null || scheme.equals(JAVA_SCHEME),
- "Unsupported JNDI URI: " + connectionFactoryName);
- } catch (URISyntaxException ex) {
- logger.warn("Invalid JNDI URI - {}", connectionFactoryName);
- }
+ verifyContext(connectionFactoryName);
assertNotEmpty(initialContextFactoryName, String.format(
"Initial Context Factory is empty. This is specified by %s",
@@ -291,10 +309,6 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
Preconditions.checkArgument(!arg.isEmpty(), msg);
}
- private void assertTrue(boolean arg, String msg) {
- Preconditions.checkArgument(arg, msg);
- }
-
@Override
protected synchronized Status doProcess() throws EventDeliveryException {
boolean error = true;
@@ -322,14 +336,12 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
sourceCounter.incrementChannelWriteFail();
} catch (JMSException jmsException) {
logger.warn("JMSException consuming events", jmsException);
- if (++jmsExceptionCounter > errorThreshold) {
- if (consumer != null) {
- logger.warn("Exceeded JMSException threshold, closing consumer");
- sourceCounter.incrementEventReadFail();
- consumer.rollback();
- consumer.close();
- consumer = null;
- }
+ if (++jmsExceptionCounter > errorThreshold && consumer != null) {
+ logger.warn("Exceeded JMSException threshold, closing consumer");
+ sourceCounter.incrementEventReadFail();
+ consumer.rollback();
+ consumer.close();
+ consumer = null;
}
} catch (Throwable throwable) {
logger.error("Unexpected error processing events", throwable);
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
index e13502e2d..10320d749 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
@@ -90,6 +90,7 @@ public class TestIntegrationActiveMQ {
private final String jmsPassword;
public TestIntegrationActiveMQ(TestMode testMode) {
+ System.setProperty(JMSSource.JNDI_ALLOWED_PROTOCOLS, "tcp");
LOGGER.info("Testing with test mode {}", testMode);
switch (testMode) {
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
index 6414428aa..e4c8cb442 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
@@ -64,6 +64,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase {
@SuppressWarnings("unchecked")
@Override
void afterSetup() throws Exception {
+ System.setProperty(JMSSource.JNDI_ALLOWED_PROTOCOLS, "dummy");
baseDir = Files.createTempDir();
passwordFile = new File(baseDir, "password");
Assert.assertTrue(passwordFile.createNewFile());