You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/06/17 00:45:53 UTC
[nifi] branch master updated: NIFI-7540: Fix TestListenSMTP and
TestListFile on macOS build environment (#4341)
This is an automated email from the ASF dual-hosted git repository.
alopresto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c18b27a NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environment (#4341)
c18b27a is described below
commit c18b27af185453816dffe6e9ed22fe9be41646f6
Author: Joey <jf...@apache.org>
AuthorDate: Tue Jun 16 19:45:37 2020 -0500
NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environment (#4341)
* NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environment
This also fixes NIFI-4760.
* NIFI-7540: Remove duplicate mail.smtp.starttls.enable from TestListenSMTP
Signed-off-by: Andy LoPresto <al...@apache.org>
---
.../apache/nifi/remote/io/socket/NetworkUtils.java | 37 +++-
.../nifi-email-processors/pom.xml | 3 +-
.../nifi/processors/email/TestListenSMTP.java | 229 ++++++++++-----------
.../nifi/processors/standard/TestListFile.java | 24 ++-
4 files changed, 163 insertions(+), 130 deletions(-)
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
index b2deecc..6be3293 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
@@ -17,11 +17,12 @@
package org.apache.nifi.remote.io.socket;
import java.io.IOException;
+import java.net.Socket;
import java.net.ServerSocket;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
-/**
- *
- */
public class NetworkUtils {
/**
@@ -43,4 +44,34 @@ public class NetworkUtils {
}
}
}
+
+ public final static boolean isListening(final String hostname, final int port) {
+ try (final Socket s = new Socket(hostname, port)) {
+ return s.isConnected();
+ } catch (final Exception ignore) {}
+ return false;
+ }
+
+ public final static boolean isListening(final String hostname, final int port, final int timeoutMillis) {
+ Boolean result = false;
+
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ result = executor.submit(() -> {
+ while(!isListening(hostname, port)) {
+ try {
+ Thread.sleep(100);
+ } catch (final Exception ignore) {}
+ }
+ return true;
+ }).get(timeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (final Exception ignore) {} finally {
+ try {
+ executor.shutdown();
+ } catch (final Exception ignore) {}
+ }
+
+ return (result != null && result);
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
index 2b693d0..29a21d9 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
@@ -54,7 +54,7 @@
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
- <version>1.5.6</version>
+ <version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.subethamail</groupId>
@@ -125,4 +125,3 @@
</dependency>
</dependencies>
</project>
-
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
index 325bfaf..2e6c783 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
@@ -18,12 +18,15 @@ package org.apache.nifi.processors.email;
import static org.junit.Assert.assertTrue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.mail.Email;
-import org.apache.commons.mail.SimpleEmail;
+import java.util.Properties;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
+
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
@@ -31,77 +34,59 @@ import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Before;
+
import org.junit.Test;
public class TestListenSMTP {
- private ScheduledExecutorService executor;
-
- @Before
- public void before() {
- this.executor = Executors.newScheduledThreadPool(2);
- }
-
- @After
- public void after() {
- this.executor.shutdown();
- }
-
@Test
- public void validateSuccessfulInteraction() throws Exception {
- int port = NetworkUtils.availablePort();
+ public void testListenSMTP() throws Exception {
+ final ListenSMTP processor = new ListenSMTP();
+ final TestRunner runner = TestRunners.newTestRunner(processor);
- TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
+ final int port = NetworkUtils.availablePort();
runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
- runner.assertValid();
- runner.run(5, false);
+ runner.run(1, false);
+
+ assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));
+
+ final Properties config = new Properties();
+ config.put("mail.smtp.host", "localhost");
+ config.put("mail.smtp.port", String.valueOf(port));
+ config.put("mail.smtp.connectiontimeout", "5000");
+ config.put("mail.smtp.timeout", "5000");
+ config.put("mail.smtp.writetimeout", "5000");
+
+ final Session session = Session.getInstance(config);
+ session.setDebug(true);
+
final int numMessages = 5;
- CountDownLatch latch = new CountDownLatch(numMessages);
-
- this.executor.schedule(() -> {
- for (int i = 0; i < numMessages; i++) {
- try {
- Email email = new SimpleEmail();
- email.setHostName("localhost");
- email.setSmtpPort(port);
- email.setFrom("alice@nifi.apache.org");
- email.setSubject("This is a test");
- email.setMsg("MSG-" + i);
- email.addTo("bob@nifi.apache.org");
- email.send();
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- latch.countDown();
- }
- }
- }, 1500, TimeUnit.MILLISECONDS);
-
- boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < numMessages; i++) {
+ final Message email = new MimeMessage(session);
+ email.setFrom(new InternetAddress("alice@nifi.apache.org"));
+ email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("bob@nifi.apache.org"));
+ email.setSubject("This is a test");
+ email.setText("MSG-" + i);
+ Transport.send(email);
+ }
+
runner.shutdown();
- assertTrue(complete);
runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, numMessages);
}
@Test
- public void validateSuccessfulInteractionWithTls() throws Exception {
- // TODO: Setting system properties without cleaning/restoring at the end of a test is an anti-pattern and can have side effects
- System.setProperty("mail.smtp.ssl.trust", "*");
- System.setProperty("javax.net.ssl.keyStore", "src/test/resources/keystore.jks");
- System.setProperty("javax.net.ssl.keyStorePassword", "passwordpassword");
- int port = NetworkUtils.availablePort();
-
- TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
+ public void testListenSMTPwithTLS() throws Exception {
+ final ListenSMTP processor = new ListenSMTP();
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+
+ final int port = NetworkUtils.availablePort();
runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
// Setup the SSL Context
- SSLContextService sslContextService = new StandardRestrictedSSLContextService();
+ final SSLContextService sslContextService = new StandardRestrictedSSLContextService();
runner.addControllerService("ssl-context", sslContextService);
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/truststore.jks");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "passwordpassword");
@@ -116,80 +101,78 @@ public class TestListenSMTP {
runner.setProperty(ListenSMTP.CLIENT_AUTH, SslContextFactory.ClientAuth.NONE.name());
runner.assertValid();
- int messageCount = 5;
- CountDownLatch latch = new CountDownLatch(messageCount);
- runner.run(messageCount, false);
-
- this.executor.schedule(() -> {
- for (int i = 0; i < messageCount; i++) {
- try {
- Email email = new SimpleEmail();
- email.setHostName("localhost");
- email.setSmtpPort(port);
- email.setFrom("alice@nifi.apache.org");
- email.setSubject("This is a test");
- email.setMsg("MSG-" + i);
- email.addTo("bob@nifi.apache.org");
-
- // Enable STARTTLS but ignore the cert
- email.setStartTLSEnabled(true);
- email.setStartTLSRequired(true);
- email.setSSLCheckServerIdentity(false);
- email.send();
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- latch.countDown();
- }
- }
- }, 1500, TimeUnit.MILLISECONDS);
-
- boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
+ runner.run(1, false);
+
+ assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));
+
+ final Properties config = new Properties();
+ config.put("mail.smtp.host", "localhost");
+ config.put("mail.smtp.port", String.valueOf(port));
+ config.put("mail.smtp.auth", "false");
+ config.put("mail.smtp.starttls.enable", "true");
+ config.put("mail.smtp.starttls.required", "true");
+ config.put("mail.smtp.ssl.trust", "*");
+ config.put("mail.smtp.connectiontimeout", "5000");
+ config.put("mail.smtp.timeout", "5000");
+ config.put("mail.smtp.writetimeout", "5000");
+
+ final Session session = Session.getInstance(config);
+ session.setDebug(true);
+
+ final int numMessages = 5;
+ for (int i = 0; i < numMessages; i++) {
+ final Message email = new MimeMessage(session);
+ email.setFrom(new InternetAddress("alice@nifi.apache.org"));
+ email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("bob@nifi.apache.org"));
+ email.setSubject("This is a test");
+ email.setText("MSG-" + i);
+ Transport.send(email);
+ }
+
runner.shutdown();
- assertTrue(complete);
- runner.assertAllFlowFilesTransferred("success", messageCount);
+ runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, numMessages);
}
- @Test
- public void validateTooLargeMessage() throws Exception {
- int port = NetworkUtils.availablePort();
+ @Test(expected = MessagingException.class)
+ public void testListenSMTPwithTooLargeMessage() throws Exception {
+ final ListenSMTP processor = new ListenSMTP();
+ final TestRunner runner = TestRunners.newTestRunner(processor);
- TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
+ final int port = NetworkUtils.availablePort();
runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "10 B");
- runner.assertValid();
+ runner.run(1, false);
+
+ assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));
+
+ final Properties config = new Properties();
+ config.put("mail.smtp.host", "localhost");
+ config.put("mail.smtp.port", String.valueOf(port));
+ config.put("mail.smtp.connectiontimeout", "5000");
+ config.put("mail.smtp.timeout", "5000");
+ config.put("mail.smtp.writetimeout", "5000");
+
+ final Session session = Session.getInstance(config);
+ session.setDebug(true);
+
+ MessagingException messagingException = null;
+ try {
+ final Message email = new MimeMessage(session);
+ email.setFrom(new InternetAddress("alice@nifi.apache.org"));
+ email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("bob@nifi.apache.org"));
+ email.setSubject("This is a test");
+ email.setText("MSG-0");
+ Transport.send(email);
+ } catch (final MessagingException e) {
+ messagingException = e;
+ }
- int messageCount = 1;
- CountDownLatch latch = new CountDownLatch(messageCount);
-
- runner.run(messageCount, false);
-
- this.executor.schedule(() -> {
- for (int i = 0; i < messageCount; i++) {
- try {
- Email email = new SimpleEmail();
- email.setHostName("localhost");
- email.setSmtpPort(port);
- email.setFrom("alice@nifi.apache.org");
- email.setSubject("This is a test");
- email.setMsg("MSG-" + i);
- email.addTo("bob@nifi.apache.org");
- email.send();
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- latch.countDown();
- }
- }
- }, 1000, TimeUnit.MILLISECONDS);
-
- boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
runner.shutdown();
- assertTrue(complete);
- runner.assertAllFlowFilesTransferred("success", 0);
+ runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, 0);
+
+ if (messagingException != null) throw messagingException;
}
+
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index bca5aea..b6af938 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -63,6 +64,8 @@ import static org.junit.Assert.assertTrue;
public class TestListFile {
+ private static boolean isMillisecondSupported = false;
+
private final String TESTDIR = "target/test/data/in";
private final File testDir = new File(TESTDIR);
private ListFile processor;
@@ -104,8 +107,13 @@ public class TestListFile {
};
@BeforeClass
- public static void setupClass() {
+ public static void setupClass() throws Exception {
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
+
+ // This only has to be done once.
+ final File file = Files.createTempFile(Paths.get("target/"), "TestListFile", null).toFile();
+ file.setLastModified(325990917351L);
+ isMillisecondSupported = file.lastModified() % 1_000 > 0;
}
@Before
@@ -151,7 +159,6 @@ public class TestListFile {
runner.clearTransferState();
final List<File> files = listFiles(testDir);
- final boolean isMillisecondSupported = files.stream().anyMatch(file -> file.lastModified() % 1_000 > 0);
final Long lagMillis;
if (isMillisecondSupported) {
lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS);
@@ -285,6 +292,11 @@ public class TestListFile {
assertTrue(file2.setLastModified(time2millis));
assertTrue(file3.setLastModified(time4millis));
}
+
+ assertTrue(file1.lastModified() > time3millis && file1.lastModified() <= time0millis);
+ assertTrue(file2.lastModified() > time3millis && file2.lastModified() < time1millis);
+ assertTrue(file3.lastModified() < time3millis);
+
try {
runNext();
} catch (InterruptedException e) {
@@ -832,6 +844,14 @@ public class TestListFile {
age4millis = 20000L;
age5millis = 100000L;
+ // Allow for bigger gaps since the lag is 2s w/o milliseconds.
+ if (!isMillisecondSupported) {
+ age1millis *= 2;
+ age2millis *= 2;
+ age3millis *= 2;
+ age4millis *= 2;
+ }
+
time0millis = syncTime - age0millis;
time1millis = syncTime - age1millis;
time2millis = syncTime - age2millis;