You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/21 14:49:53 UTC
[3/3] nifi git commit: NIFI-1899 reworking bi-directional Email
processing in ListenSMTP
NIFI-1899 reworking bi-directional Email processing in ListenSMTP
This closes #483
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4e224c28
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4e224c28
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4e224c28
Branch: refs/heads/master
Commit: 4e224c283f84e25c08ad4371f6ab2c323c02dd25
Parents: 4f67283
Author: jpercivall <jo...@yahoo.com>
Authored: Sat Jul 16 18:29:41 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Thu Jul 21 10:39:21 2016 -0400
----------------------------------------------------------------------
.../nifi/processors/email/ListenSMTP.java | 156 ++++++++++++-------
.../processors/email/smtp/event/SmtpEvent.java | 19 +--
.../smtp/handler/SMTPMessageHandlerFactory.java | 147 ++++++++---------
.../email/smtp/handler/SMTPResultCode.java | 12 +-
.../nifi/processors/email/TestListenSMTP.java | 57 ++++---
5 files changed, 214 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
index 51b1d2d..0f17838 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
@@ -20,7 +20,7 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
@@ -44,7 +44,6 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -64,7 +63,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
@@ -356,87 +354,125 @@ public class ListenSMTP extends AbstractProcessor {
while (!incomingMessages.isEmpty()) {
SmtpEvent message = incomingMessages.poll();
-
if (message == null) {
return;
}
synchronized (message) {
-
- FlowFile flowfile = session.create();
-
- if (message.getMessageData() != null) {
- ByteArrayOutputStream messageData = message.getMessageData();
- flowfile = session.write(flowfile, new OutputStreamCallback() {
-
- // Write the messageData to flowfile content
- @Override
- public void process(OutputStream out) throws IOException {
- out.write(messageData.toByteArray());
- }
- });
+ if (resultCodeSetAndIsError(message)) {
+ SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+ getLogger().warn("Message failed before onTrigger processing message was: " + resultCode.getLogMessage());
+ continue;
}
- HashMap<String, String> attributes = new HashMap<>();
- // Gather message attributes
- attributes.put(SMTP_HELO, message.getHelo());
- attributes.put(SMTP_SRC_IP, message.getHelo());
- attributes.put(SMTP_FROM, message.getFrom());
- attributes.put(SMTP_TO, message.getTo());
+ try {
+ FlowFile flowfile = session.create();
+
+ if (message.getMessageData() != null) {
+ flowfile = session.write(flowfile, out -> {
+ InputStream inputStream = message.getMessageData();
+ byte [] buffer = new byte[1024];
+
+ int rd;
+ long totalBytesRead =0;
+
+ while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
+ totalBytesRead += rd;
+ if (totalBytesRead > server.getMaxMessageSize() ) {
+ message.setReturnCode(500);
+ message.setProcessed();
+ break;
+ }
+ out.write(buffer, 0, rd);
+ }
+ out.flush();
+ });
+ } else {
+ getLogger().debug("Message body was null");
+ message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode());
+ message.setProcessed();
+ }
+
+ if (!message.getProcessed()) {
+ HashMap<String, String> attributes = new HashMap<>();
+ // Gather message attributes
+ attributes.put(SMTP_HELO, message.getHelo());
+ attributes.put(SMTP_SRC_IP, message.getHelo());
+ attributes.put(SMTP_FROM, message.getFrom());
+ attributes.put(SMTP_TO, message.getTo());
+
+ List<Map<String, String>> details = message.getCertifcateDetails();
+ int c = 0;
+
+ // Add a selection of each X509 certificates to the already gathered attributes
+
+ for (Map<String, String> detail : details) {
+ attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
+ attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
+ c++;
+ }
- List<Map<String, String>> details = message.getCertifcateDetails();
- int c = 0;
+ // Set Mime-Type
+ attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
- // Add a selection of each X509 certificates to the already gathered attributes
+ // Add the attributes. to flowfile
+ flowfile = session.putAllAttributes(flowfile, attributes);
+ session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
+ session.transfer(flowfile, REL_SUCCESS);
- for (Map<String, String> detail : details) {
- attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
- attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
- c++;
+ getLogger().info("Transferring {} to success", new Object[]{flowfile});
+ }
+ } catch (Exception e) {
+ message.setProcessed();
+ message.setReturnCode(SMTPResultCode.UNEXPECTED_ERROR.getCode());
}
- // Set Mime-Type
- attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
-
- // Add the attributes. to flowfile
- flowfile = session.putAllAttributes(flowfile, attributes);
- session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
- session.transfer(flowfile, REL_SUCCESS);
- getLogger().info("Transferring {} to success", new Object[]{flowfile});
+ // Check to see if it failed when creating the FlowFile
+ if (resultCodeSetAndIsError(message)) {
+ session.rollback();
+ SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+ getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage());
+ message.notifyAll();
+ continue;
+ }
// Finished processing,
message.setProcessed();
- // update the latch so data() can process the rest of the method
- message.updateProcessedLatch();
+ // notify on the message so data() can process the rest of the method
+ message.notifyAll();
- // End of synchronized block
- }
-
- // Wait for SMTPMessageHandler data() and done() to complete
- // their side of the work (i.e. acknowledgement)
- while (!message.getAcknowledged()) {
- // Busy wait
+ // Wait for data() to tell sender we received the message and double check we didn't timeout
+ final long serverTimeout = context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ try {
+ message.wait(serverTimeout);
+ } catch (InterruptedException e) {
+ getLogger().info("Interrupted while waiting for Message Handler to acknowledge message.");
}
- // Lock one last time
- synchronized (message) {
- SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
- switch (resultCode) {
- case UNEXPECTED_ERROR:
- case TIMEOUT_ERROR:
- session.rollback();
- getLogger().warn(resultCode.getLogMessage());
- case SUCCESS:
- getLogger().info(resultCode.getLogMessage());
- break;
- default:
- getLogger().error(resultCode.getLogMessage());
+ // Check to see if the sender was correctly notified
+ if (resultCodeSetAndIsError(message)) {
+ SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+ session.rollback();
+ getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage());
+ } else {
+ // Need to commit because if we didn't and a following message needed to be rolled back, this message would be too, causing data loss.
+ session.commit();
}
}
}
}
+ private boolean resultCodeSetAndIsError(SmtpEvent message){
+ if (message.getReturnCode() != null ) {
+ SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+ if (resultCode.isError()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
// Same old... same old... used for testing to access the random port that was selected
protected int getPort() {
return server == null ? 0 : server.getPort();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
index eaded4a..e1c36c5 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
@@ -18,14 +18,13 @@
package org.apache.nifi.processors.email.smtp.event;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import java.io.InputStream;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,19 +37,15 @@ public class SmtpEvent{
private final String helo;
private final String from;
private final String to;
- private final ByteArrayOutputStream messageData;
+ private final InputStream messageData;
private List<Map<String, String>> certificatesDetails;
private AtomicBoolean processed = new AtomicBoolean(false);
private AtomicBoolean acknowledged = new AtomicBoolean(false);
private AtomicInteger returnCode = new AtomicInteger();
- private CountDownLatch processedLatch;
public SmtpEvent(
final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates,
- final ByteArrayOutputStream messageData,
- CountDownLatch processedLatch) {
-
- this.processedLatch = processedLatch;
+ final InputStream messageData) {
this.remoteIP = remoteIP;
this.helo = helo;
@@ -86,7 +81,7 @@ public class SmtpEvent{
return helo;
}
- public synchronized ByteArrayOutputStream getMessageData() {
+ public synchronized InputStream getMessageData() {
return messageData;
}
@@ -118,15 +113,11 @@ public class SmtpEvent{
return this.acknowledged.get();
}
- public synchronized void updateProcessedLatch() {
- this.processedLatch.countDown();
- }
-
public synchronized void setReturnCode(int code) {
this.returnCode.set(code);
}
- public synchronized int getReturnCode() {
+ public synchronized Integer getReturnCode() {
return this.returnCode.get();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
index 0ac4127..6b647bf 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
@@ -20,12 +20,10 @@ package org.apache.nifi.processors.email.smtp.handler;
import java.io.IOException;
import java.io.InputStream;
import java.security.cert.X509Certificate;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.StopWatch;
import org.subethamail.smtp.DropConnectionException;
import org.subethamail.smtp.MessageContext;
@@ -33,7 +31,6 @@ import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.MessageHandlerFactory;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;
-import org.subethamail.smtp.server.SMTPServer;
import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
@@ -57,13 +54,9 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
final MessageContext messageContext;
String from;
String recipient;
- ByteArrayOutputStream messageData;
-
- private CountDownLatch latch;
public Handler(MessageContext messageContext, LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger){
this.messageContext = messageContext;
- this.latch = new CountDownLatch(1);
}
@Override
@@ -82,31 +75,9 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException {
// Start counting the timer...
StopWatch watch = new StopWatch(true);
-
long elapsed;
-
- SMTPServer server = messageContext.getSMTPServer();
-
final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS);
- this.messageData = new ByteArrayOutputStream();
-
- byte [] buffer = new byte[1024];
-
- int rd;
-
- while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
- messageData.write(buffer, 0, rd);
- if (messageData.getBufferLength() > server.getMaxMessageSize() ) {
- // NOTE: Setting processed at this stage is not desirable as message object will only be created
- // if this test (i.e. message size) passes.
- final SMTPResultCode returnCode = SMTPResultCode.fromCode(500);
- logger.warn(returnCode.getLogMessage());
- throw new TooMuchDataException(returnCode.getErrorMessage());
- }
- }
- messageData.flush();
-
X509Certificate[] certificates = new X509Certificate[]{};
final String remoteIP = messageContext.getRemoteAddress().toString();
@@ -116,67 +87,79 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone();
}
- SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, messageData, latch);
+ SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, inputStream);
+
+ synchronized (message) {
+ // / Try to queue the message back to the NiFi session
+ try {
+ elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
+ incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
+ logger.trace(returnCode.getLogMessage());
+
+ // NOTE: Setting acknowledged at this stage is redundant as this catch deals with the inability of
+ // adding message to the processing queue. Yet, for the sake of consistency the message is
+ // updated nonetheless
+ message.setReturnCode(returnCode.getCode());
+ message.setAcknowledged();
+ throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+ }
- // / Try to queue the message back to the NiFi session
- try {
+ // Once message has been sent to the queue, it should be processed by NiFi onTrigger,
+ // a flowfile created and its processed status updated before an acknowledgment is
+ // given back to the SMTP client
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
- incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
- logger.trace(returnCode.getLogMessage());
-
- // NOTE: Setting processed at this stage is redundant as this catch deals with the inability of
- // adding message to the processing queue. Yet, for the sake of consistency the message is
- // updated nonetheless
- message.setReturnCode(returnCode.getCode());
- message.setAcknowledged();
- throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
- }
-
- // Once message has been sent to the queue, it should be processed by NiFi onTrigger,
- // a flowfile created and its processed status updated before an acknowledgment is
- // given back to the SMTP client
- elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
- try {
- latch.await(serverTimeout - elapsed, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // Latch open unexpectedly. Will return error and requestonTrigger to rollback
- logger.trace("Latch opened unexpectedly and processor indicates data wasn't processed. Returned error to SMTP client as precautionary measure");
- incomingMessages.remove(message);
-
- // Set the final values so onTrigger can figure out what happened to message
- final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
- message.setReturnCode(returnCode.getCode());
- message.setAcknowledged();
-
- // Inform client
- throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
- }
+ try {
+ message.wait(serverTimeout - elapsed);
+ } catch (InterruptedException e) {
+ // Interrupted while waiting for the message to process. Will return error and request onTrigger to rollback
+ logger.trace("Interrupted while waiting for processor to process data. Returned error to SMTP client as precautionary measure");
+ incomingMessages.remove(message);
+
+ // Set the final values so onTrigger can figure out what happened to message
+ final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
+ message.setReturnCode(returnCode.getCode());
+ message.setAcknowledged();
+
+ // Inform client
+ throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+ }
- // Remove the message from the queue.
- incomingMessages.remove(message);
- // Check if message is processed and if yes, check if it was received on time and wraps it up.
- elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
- if (!message.getProcessed() || (elapsed >= serverTimeout)) {
- final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
- logger.trace("Did not receive the onTrigger response within the acceptable timeframes. Data duplication may have occurred.");
-
- // Set the final values so onTrigger can figure out what happened to message
- message.setReturnCode(returnCode.getCode());
- message.setAcknowledged();
- throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+ // Check if message is processed
+ if (!message.getProcessed()) {
+ incomingMessages.remove(message);
+ final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
+ logger.trace("Did not receive the onTrigger response within the acceptable timeframe.");
+
+ // Set the final values so onTrigger can figure out what happened to message
+ message.setReturnCode(returnCode.getCode());
+ message.setAcknowledged();
+ throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+ } else if(message.getReturnCode() != null) {
+ // No need to check if over server timeout because we already processed the data. Might as well use the status code returned by onTrigger.
+ final SMTPResultCode returnCode = SMTPResultCode.fromCode(message.getReturnCode());
+
+ if(returnCode.isError()){
+ message.setAcknowledged();
+ throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+ }
+ } else {
+ // onTrigger successfully processed the data.
+ // No need to check if over server timeout because we already processed the data. Might as well finalize it.
+ // Set the final values so onTrigger can figure out what happened to message
+ message.setReturnCode(250);
+ message.setAcknowledged();
+ }
+ // Exit, allowing Handler to acknowledge the message
+ message.notifyAll();
}
-
- // Set the final values so onTrigger can figure out what happened to message
- message.setReturnCode(250);
- message.setAcknowledged();
- // Exit, allowing Handler to acknowledge the message
- }
+ }
@Override
public void done() {
logger.trace("Called the last method of message handler. Exiting");
+ // Notifying the ontrigger that the message was handled.
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
index 5328a0d..b9c0d60 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
@@ -79,5 +79,15 @@ public enum SMTPResultCode {
return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode;
}
-
+ public boolean isError(){
+ switch (this) {
+ case MESSAGE_TOO_LARGE:
+ case UNEXPECTED_ERROR:
+ case QUEUE_ERROR:
+ case TIMEOUT_ERROR:
+ return true;
+ default:
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
index 983ac4a..1fd7628 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
@@ -32,6 +32,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
public class TestListenSMTP {
@@ -251,7 +252,7 @@ public class TestListenSMTP {
listenSmtp.startShutdown();
}
- @Test(timeout=15000, expected=EmailException.class)
+ @Test(timeout=15000)
public void emailTooLarge() throws Exception {
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
@@ -272,31 +273,47 @@ public class TestListenSMTP {
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
+ AtomicBoolean finished = new AtomicBoolean(false);;
+ AtomicBoolean failed = new AtomicBoolean(false);
- Email email = new SimpleEmail();
- email.setHostName("127.0.0.1");
- email.setSmtpPort(port);
- email.setStartTLSEnabled(false);
- email.setFrom("alice@nifi.apache.org");
- email.setSubject("This is a test");
- email.setMsg("Test test test chocolate");
- email.addTo("bob@nifi.apache.org");
- email.send();
+ try {
+ final Thread clientThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Email email = new SimpleEmail();
+ email.setHostName("127.0.0.1");
+ email.setSmtpPort(port);
+ email.setStartTLSEnabled(false);
+ email.setFrom("alice@nifi.apache.org");
+ email.setSubject("This is a test");
+ email.setMsg("Test test test chocolate");
+ email.addTo("bob@nifi.apache.org");
+ email.send();
- Thread.sleep(100);
+ } catch (final EmailException t) {
+ failed.set(true);
+ }
+ finished.set(true);
+ }
+ });
+ clientThread.start();
+ while (!finished.get()) {
+ // process the request.
+ listenSmtp.onTrigger(context, processSessionFactory);
+ Thread.sleep(10);
+ }
+ clientThread.stop();
- // process the request.
- listenSmtp.onTrigger(context, processSessionFactory);
+ Assert.assertTrue("Sending email succeeded when it should have failed", failed.get());
- runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0);
- runner.assertQueueEmpty();
+ runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0);
- try {
- listenSmtp.startShutdown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- Assert.assertFalse(e.toString(), true);
+ runner.assertQueueEmpty();
+ } finally {
+ // shut down the server
+ listenSmtp.startShutdown();
}
}
}
\ No newline at end of file