You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/06/17 00:45:53 UTC

[nifi] branch master updated: NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environment (#4341)

This is an automated email from the ASF dual-hosted git repository.

alopresto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c18b27a  NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environment (#4341)
c18b27a is described below

commit c18b27af185453816dffe6e9ed22fe9be41646f6
Author: Joey <jf...@apache.org>
AuthorDate: Tue Jun 16 19:45:37 2020 -0500

    NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environment (#4341)
    
    * NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environment
    
    This also fixes NIFI-4760.
    
    * NIFI-7540: Remove duplicate mail.smtp.starttls.enable from TestListenSMTP
    
    Signed-off-by: Andy LoPresto <al...@apache.org>
---
 .../apache/nifi/remote/io/socket/NetworkUtils.java |  37 +++-
 .../nifi-email-processors/pom.xml                  |   3 +-
 .../nifi/processors/email/TestListenSMTP.java      | 229 ++++++++++-----------
 .../nifi/processors/standard/TestListFile.java     |  24 ++-
 4 files changed, 163 insertions(+), 130 deletions(-)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
index b2deecc..6be3293 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
@@ -17,11 +17,12 @@
 package org.apache.nifi.remote.io.socket;
 
 import java.io.IOException;
+import java.net.Socket;
 import java.net.ServerSocket;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
-/**
- *
- */
 public class NetworkUtils {
 
     /**
@@ -43,4 +44,34 @@ public class NetworkUtils {
             }
         }
     }
+
+    public final static boolean isListening(final String hostname, final int port) {
+        try (final Socket s = new Socket(hostname, port)) {
+            return s.isConnected();
+        } catch (final Exception ignore) {}
+        return false;
+    }
+
+    public final static boolean isListening(final String hostname, final int port, final int timeoutMillis) {
+        Boolean result = false;
+
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            result = executor.submit(() -> {
+                while(!isListening(hostname, port)) {
+                    try {
+                        Thread.sleep(100);
+                    } catch (final Exception ignore) {}
+                }
+                return true;
+            }).get(timeoutMillis, TimeUnit.MILLISECONDS);
+        } catch (final Exception ignore) {} finally {
+            try {
+                executor.shutdown();
+            } catch (final Exception ignore) {}
+        }
+
+        return (result != null && result);
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
index 2b693d0..29a21d9 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
@@ -54,7 +54,7 @@
         <dependency>
             <groupId>com.sun.mail</groupId>
             <artifactId>javax.mail</artifactId>
-            <version>1.5.6</version>
+            <version>1.6.2</version>
         </dependency>
         <dependency>
             <groupId>org.subethamail</groupId>
@@ -125,4 +125,3 @@
         </dependency>
     </dependencies>
 </project>
-
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
index 325bfaf..2e6c783 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
@@ -18,12 +18,15 @@ package org.apache.nifi.processors.email;
 
 import static org.junit.Assert.assertTrue;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.mail.Email;
-import org.apache.commons.mail.SimpleEmail;
+import java.util.Properties;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
+
 import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.ssl.SSLContextService;
@@ -31,77 +34,59 @@ import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
 import org.apache.nifi.ssl.StandardSSLContextService;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Before;
+
 import org.junit.Test;
 
 public class TestListenSMTP {
 
-    private ScheduledExecutorService executor;
-
-    @Before
-    public void before() {
-        this.executor = Executors.newScheduledThreadPool(2);
-    }
-
-    @After
-    public void after() {
-        this.executor.shutdown();
-    }
-
     @Test
-    public void validateSuccessfulInteraction() throws Exception {
-        int port = NetworkUtils.availablePort();
+    public void testListenSMTP() throws Exception {
+        final ListenSMTP processor = new ListenSMTP();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
 
-        TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
+        final int port = NetworkUtils.availablePort();
         runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
         runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
 
-        runner.assertValid();
-        runner.run(5, false);
+        runner.run(1, false);
+
+        assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));
+
+        final Properties config = new Properties();
+        config.put("mail.smtp.host", "localhost");
+        config.put("mail.smtp.port", String.valueOf(port));
+        config.put("mail.smtp.connectiontimeout", "5000");
+        config.put("mail.smtp.timeout", "5000");
+        config.put("mail.smtp.writetimeout", "5000");
+
+        final Session session = Session.getInstance(config);
+        session.setDebug(true);
+
         final int numMessages = 5;
-        CountDownLatch latch = new CountDownLatch(numMessages);
-
-        this.executor.schedule(() -> {
-            for (int i = 0; i < numMessages; i++) {
-                try {
-                    Email email = new SimpleEmail();
-                    email.setHostName("localhost");
-                    email.setSmtpPort(port);
-                    email.setFrom("alice@nifi.apache.org");
-                    email.setSubject("This is a test");
-                    email.setMsg("MSG-" + i);
-                    email.addTo("bob@nifi.apache.org");
-                    email.send();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
-                } finally {
-                    latch.countDown();
-                }
-            }
-        }, 1500, TimeUnit.MILLISECONDS);
-
-        boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
+        for (int i = 0; i < numMessages; i++) {
+            final Message email = new MimeMessage(session);
+            email.setFrom(new InternetAddress("alice@nifi.apache.org"));
+            email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("bob@nifi.apache.org"));
+            email.setSubject("This is a test");
+            email.setText("MSG-" + i);
+            Transport.send(email);
+        }
+
         runner.shutdown();
-        assertTrue(complete);
         runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, numMessages);
     }
 
     @Test
-    public void validateSuccessfulInteractionWithTls() throws Exception {
-        // TODO: Setting system properties without cleaning/restoring at the end of a test is an anti-pattern and can have side effects
-        System.setProperty("mail.smtp.ssl.trust", "*");
-        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/keystore.jks");
-        System.setProperty("javax.net.ssl.keyStorePassword", "passwordpassword");
-        int port = NetworkUtils.availablePort();
-
-        TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
+    public void testListenSMTPwithTLS() throws Exception {
+        final ListenSMTP processor = new ListenSMTP();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+
+        final int port = NetworkUtils.availablePort();
         runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
         runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
 
         // Setup the SSL Context
-        SSLContextService sslContextService = new StandardRestrictedSSLContextService();
+        final SSLContextService sslContextService = new StandardRestrictedSSLContextService();
         runner.addControllerService("ssl-context", sslContextService);
         runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/truststore.jks");
         runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "passwordpassword");
@@ -116,80 +101,78 @@ public class TestListenSMTP {
         runner.setProperty(ListenSMTP.CLIENT_AUTH, SslContextFactory.ClientAuth.NONE.name());
         runner.assertValid();
 
-        int messageCount = 5;
-        CountDownLatch latch = new CountDownLatch(messageCount);
-        runner.run(messageCount, false);
-
-        this.executor.schedule(() -> {
-            for (int i = 0; i < messageCount; i++) {
-                try {
-                    Email email = new SimpleEmail();
-                    email.setHostName("localhost");
-                    email.setSmtpPort(port);
-                    email.setFrom("alice@nifi.apache.org");
-                    email.setSubject("This is a test");
-                    email.setMsg("MSG-" + i);
-                    email.addTo("bob@nifi.apache.org");
-
-                    // Enable STARTTLS but ignore the cert
-                    email.setStartTLSEnabled(true);
-                    email.setStartTLSRequired(true);
-                    email.setSSLCheckServerIdentity(false);
-                    email.send();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
-                } finally {
-                    latch.countDown();
-                }
-            }
-        }, 1500, TimeUnit.MILLISECONDS);
-
-        boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
+        runner.run(1, false);
+
+        assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));
+
+        final Properties config = new Properties();
+        config.put("mail.smtp.host", "localhost");
+        config.put("mail.smtp.port", String.valueOf(port));
+        config.put("mail.smtp.auth", "false");
+        config.put("mail.smtp.starttls.enable", "true");
+        config.put("mail.smtp.starttls.required", "true");
+        config.put("mail.smtp.ssl.trust", "*");
+        config.put("mail.smtp.connectiontimeout", "5000");
+        config.put("mail.smtp.timeout", "5000");
+        config.put("mail.smtp.writetimeout", "5000");
+
+        final Session session = Session.getInstance(config);
+        session.setDebug(true);
+
+        final int numMessages = 5;
+        for (int i = 0; i < numMessages; i++) {
+            final Message email = new MimeMessage(session);
+            email.setFrom(new InternetAddress("alice@nifi.apache.org"));
+            email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("bob@nifi.apache.org"));
+            email.setSubject("This is a test");
+            email.setText("MSG-" + i);
+            Transport.send(email);
+        }
+
         runner.shutdown();
-        assertTrue(complete);
-        runner.assertAllFlowFilesTransferred("success", messageCount);
+        runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, numMessages);
     }
 
-    @Test
-    public void validateTooLargeMessage() throws Exception {
-        int port = NetworkUtils.availablePort();
+    @Test(expected = MessagingException.class)
+    public void testListenSMTPwithTooLargeMessage() throws Exception {
+        final ListenSMTP processor = new ListenSMTP();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
 
-        TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
+        final int port = NetworkUtils.availablePort();
         runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
         runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
         runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "10 B");
 
-        runner.assertValid();
+        runner.run(1, false);
+
+        assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));
+
+        final Properties config = new Properties();
+        config.put("mail.smtp.host", "localhost");
+        config.put("mail.smtp.port", String.valueOf(port));
+        config.put("mail.smtp.connectiontimeout", "5000");
+        config.put("mail.smtp.timeout", "5000");
+        config.put("mail.smtp.writetimeout", "5000");
+
+        final Session session = Session.getInstance(config);
+        session.setDebug(true);
+
+        MessagingException messagingException = null;
+        try {
+            final Message email = new MimeMessage(session);
+            email.setFrom(new InternetAddress("alice@nifi.apache.org"));
+            email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("bob@nifi.apache.org"));
+            email.setSubject("This is a test");
+            email.setText("MSG-0");
+            Transport.send(email);
+        } catch (final MessagingException e) {
+            messagingException = e;
+        }
 
-        int messageCount = 1;
-        CountDownLatch latch = new CountDownLatch(messageCount);
-
-        runner.run(messageCount, false);
-
-        this.executor.schedule(() -> {
-            for (int i = 0; i < messageCount; i++) {
-                try {
-                    Email email = new SimpleEmail();
-                    email.setHostName("localhost");
-                    email.setSmtpPort(port);
-                    email.setFrom("alice@nifi.apache.org");
-                    email.setSubject("This is a test");
-                    email.setMsg("MSG-" + i);
-                    email.addTo("bob@nifi.apache.org");
-                    email.send();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
-                } finally {
-                    latch.countDown();
-                }
-            }
-        }, 1000, TimeUnit.MILLISECONDS);
-
-        boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
         runner.shutdown();
-        assertTrue(complete);
-        runner.assertAllFlowFilesTransferred("success", 0);
+        runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, 0);
+
+        if (messagingException != null) throw messagingException;
     }
+
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index bca5aea..b6af938 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.nio.file.FileStore;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -63,6 +64,8 @@ import static org.junit.Assert.assertTrue;
 
 public class TestListFile {
 
+    private static boolean isMillisecondSupported = false;
+
     private final String TESTDIR = "target/test/data/in";
     private final File testDir = new File(TESTDIR);
     private ListFile processor;
@@ -104,8 +107,13 @@ public class TestListFile {
     };
 
     @BeforeClass
-    public static void setupClass() {
+    public static void setupClass() throws Exception {
         Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
+
+        // This only has to be done once.
+        final File file = Files.createTempFile(Paths.get("target/"), "TestListFile", null).toFile();
+        file.setLastModified(325990917351L);
+        isMillisecondSupported = file.lastModified() % 1_000 > 0;
     }
 
     @Before
@@ -151,7 +159,6 @@ public class TestListFile {
         runner.clearTransferState();
 
         final List<File> files = listFiles(testDir);
-        final boolean isMillisecondSupported = files.stream().anyMatch(file -> file.lastModified() % 1_000 > 0);
         final Long lagMillis;
         if (isMillisecondSupported) {
             lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS);
@@ -285,6 +292,11 @@ public class TestListFile {
                 assertTrue(file2.setLastModified(time2millis));
                 assertTrue(file3.setLastModified(time4millis));
             }
+
+            assertTrue(file1.lastModified() > time3millis && file1.lastModified() <= time0millis);
+            assertTrue(file2.lastModified() > time3millis && file2.lastModified() < time1millis);
+            assertTrue(file3.lastModified() < time3millis);
+
             try {
                 runNext();
             } catch (InterruptedException e) {
@@ -832,6 +844,14 @@ public class TestListFile {
         age4millis = 20000L;
         age5millis = 100000L;
 
+        // Allow for bigger gaps since the lag is 2s w/o milliseconds.
+        if (!isMillisecondSupported) {
+            age1millis *= 2;
+            age2millis *= 2;
+            age3millis *= 2;
+            age4millis *= 2;
+        }
+
         time0millis = syncTime - age0millis;
         time1millis = syncTime - age1millis;
         time2millis = syncTime - age2millis;