You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/10/08 04:00:17 UTC

[flume] 04/14: FLUME-3437 - Improve JMSSource validation

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git

commit eee179a09df405c1ab55ae25a53b76ca1050bb97
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Fri Sep 23 00:10:42 2022 -0700

    FLUME-3437 - Improve JMSSource validation
---
 .../flume/source/jms/JMSMessageConsumer.java       | 16 +----
 .../flume/source/jms/JMSMessageConverter.java      |  6 +-
 .../org/apache/flume/source/jms/JMSSource.java     | 68 +++++++++++++---------
 .../flume/source/jms/TestIntegrationActiveMQ.java  |  1 +
 .../org/apache/flume/source/jms/TestJMSSource.java |  1 +
 5 files changed, 46 insertions(+), 46 deletions(-)

diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index 5375bd030..84409be67 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -35,14 +35,11 @@ import javax.jms.Session;
 import javax.jms.Topic;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
 class JMSMessageConsumer {
   private static final Logger logger = LoggerFactory.getLogger(JMSMessageConsumer.class);
-  private static final String JAVA_SCHEME = "java";
 
   private final int batchSize;
   private final long pollTimeout;
@@ -102,14 +99,7 @@ class JMSMessageConsumer {
               throw new IllegalStateException(String.valueOf(destinationType));
           }
         } else {
-          try {
-            URI uri = new URI(destinationName);
-            String scheme = uri.getScheme();
-            assertTrue(scheme == null || scheme.equals(JAVA_SCHEME),
-                "Unsupported JNDI URI: " + destinationName);
-          } catch (URISyntaxException ex) {
-            logger.warn("Invalid JNDI URI - {}", destinationName);
-          }
+          JMSSource.verifyContext(destinationName);
           destination = (Destination) initialContext.lookup(destinationName);
         }
       } catch (JMSException e) {
@@ -220,8 +210,4 @@ class JMSMessageConsumer {
       logger.error("Could not destroy connection", e);
     }
   }
-
-  private void assertTrue(boolean arg, String msg) {
-    Preconditions.checkArgument(arg, msg);
-  }
 }
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java
index 7d02809b4..31a15b874 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java
@@ -39,13 +39,13 @@ import org.apache.flume.annotations.InterfaceStability;
 @InterfaceStability.Stable
 public interface JMSMessageConverter {
 
-  public List<Event> convert(Message message) throws JMSException;
+  List<Event> convert(Message message) throws JMSException;
 
   /**
    * Implementors of JMSMessageConverter must either provide
    * a suitable builder or implement the Configurable interface.
    */
-  public interface Builder {
-    public JMSMessageConverter build(Context context);
+  interface Builder {
+    JMSMessageConverter build(Context context);
   }
 }
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index 216225871..43df14518 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
@@ -56,6 +58,7 @@ import com.google.common.io.Files;
 public class JMSSource extends AbstractPollableSource implements BatchSizeSupported {
   private static final Logger logger = LoggerFactory.getLogger(JMSSource.class);
   private static final String JAVA_SCHEME = "java";
+  public static final String JNDI_ALLOWED_PROTOCOLS = "JndiAllowedProtocols";
 
   // setup by constructor
   private final InitialContextFactory initialContextFactory;
@@ -82,6 +85,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
 
   private int jmsExceptionCounter;
   private InitialContext initialContext;
+  private static List<String> allowedSchemes = getAllowedProtocols();
 
   public JMSSource() {
     this(new InitialContextFactory());
@@ -92,6 +96,34 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
     this.initialContextFactory = initialContextFactory;
   }
 
+  private static List<String> getAllowedProtocols() {
+    String allowed = System.getProperty(JNDI_ALLOWED_PROTOCOLS, null);
+    if (allowed == null) {
+      return Collections.singletonList(JAVA_SCHEME);
+    } else {
+      String[] items = allowed.split(",");
+      List<String> schemes = new ArrayList<>();
+      schemes.add(JAVA_SCHEME);
+      for (String item : items) {
+        if (!item.equals(JAVA_SCHEME)) {
+          schemes.add(item.trim());
+        }
+      }
+      return schemes;
+    }
+  }
+
+  public static void verifyContext(String location) {
+    try {
+      String scheme = new URI(location).getScheme();
+      if (scheme != null && !allowedSchemes.contains(scheme)) {
+        throw new IllegalArgumentException("Invalid JNDI URI: " + location);
+      }
+    } catch (URISyntaxException ex) {
+      logger.trace("{}} is not a valid URI", location);
+    }
+  }
+
   @Override
   protected void doConfigure(Context context) throws FlumeException {
     sourceCounter = new SourceCounter(getName());
@@ -100,14 +132,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
         JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, "").trim();
 
     providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "").trim();
-    try {
-      URI uri = new URI(providerUrl);
-      String scheme = uri.getScheme();
-      assertTrue(scheme == null || scheme.equals(JAVA_SCHEME),
-          "Unsupported JNDI URI: " + providerUrl);
-    } catch (URISyntaxException ex) {
-      logger.warn("Invalid JNDI URI - {}", providerUrl);
-    }
+    verifyContext(providerUrl);
 
     destinationName = context.getString(JMSSourceConfiguration.DESTINATION_NAME, "").trim();
 
@@ -190,14 +215,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
     String connectionFactoryName = context.getString(
         JMSSourceConfiguration.CONNECTION_FACTORY,
         JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT).trim();
-    try {
-      URI uri = new URI(connectionFactoryName);
-      String scheme = uri.getScheme();
-      assertTrue(scheme == null || scheme.equals(JAVA_SCHEME),
-          "Unsupported JNDI URI: " + connectionFactoryName);
-    } catch (URISyntaxException ex) {
-      logger.warn("Invalid JNDI URI - {}", connectionFactoryName);
-    }
+    verifyContext(connectionFactoryName);
 
     assertNotEmpty(initialContextFactoryName, String.format(
         "Initial Context Factory is empty. This is specified by %s",
@@ -291,10 +309,6 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
     Preconditions.checkArgument(!arg.isEmpty(), msg);
   }
 
-  private void assertTrue(boolean arg, String msg) {
-    Preconditions.checkArgument(arg, msg);
-  }
-
   @Override
   protected synchronized Status doProcess() throws EventDeliveryException {
     boolean error = true;
@@ -322,14 +336,12 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor
       sourceCounter.incrementChannelWriteFail();
     } catch (JMSException jmsException) {
       logger.warn("JMSException consuming events", jmsException);
-      if (++jmsExceptionCounter > errorThreshold) {
-        if (consumer != null) {
-          logger.warn("Exceeded JMSException threshold, closing consumer");
-          sourceCounter.incrementEventReadFail();
-          consumer.rollback();
-          consumer.close();
-          consumer = null;
-        }
+      if (++jmsExceptionCounter > errorThreshold && consumer != null) {
+        logger.warn("Exceeded JMSException threshold, closing consumer");
+        sourceCounter.incrementEventReadFail();
+        consumer.rollback();
+        consumer.close();
+        consumer = null;
       }
     } catch (Throwable throwable) {
       logger.error("Unexpected error processing events", throwable);
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
index e13502e2d..10320d749 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
@@ -90,6 +90,7 @@ public class TestIntegrationActiveMQ {
   private final String jmsPassword;
 
   public TestIntegrationActiveMQ(TestMode testMode) {
+    System.setProperty(JMSSource.JNDI_ALLOWED_PROTOCOLS, "tcp");
     LOGGER.info("Testing with test mode {}", testMode);
 
     switch (testMode) {
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
index 6414428aa..e4c8cb442 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
@@ -64,6 +64,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase {
   @SuppressWarnings("unchecked")
   @Override
   void afterSetup() throws Exception {
+    System.setProperty(JMSSource.JNDI_ALLOWED_PROTOCOLS, "dummy");
     baseDir = Files.createTempDir();
     passwordFile = new File(baseDir, "password");
     Assert.assertTrue(passwordFile.createNewFile());