You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/21 14:49:53 UTC

[3/3] nifi git commit: NIFI-1899 reworking bi-directional Email processing in ListenSMTP

NIFI-1899 reworking bi-directional Email processing in ListenSMTP

This closes #483


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4e224c28
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4e224c28
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4e224c28

Branch: refs/heads/master
Commit: 4e224c283f84e25c08ad4371f6ab2c323c02dd25
Parents: 4f67283
Author: jpercivall <jo...@yahoo.com>
Authored: Sat Jul 16 18:29:41 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Thu Jul 21 10:39:21 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/email/ListenSMTP.java       | 156 ++++++++++++-------
 .../processors/email/smtp/event/SmtpEvent.java  |  19 +--
 .../smtp/handler/SMTPMessageHandlerFactory.java | 147 ++++++++---------
 .../email/smtp/handler/SMTPResultCode.java      |  12 +-
 .../nifi/processors/email/TestListenSMTP.java   |  57 ++++---
 5 files changed, 214 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
index 51b1d2d..0f17838 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
@@ -20,7 +20,7 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocket;
 import javax.net.ssl.SSLSocketFactory;
 import java.io.IOException;
-import java.io.OutputStream;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
@@ -44,7 +44,6 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -64,7 +63,6 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.ssl.SSLContextService;
 
 import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
@@ -356,87 +354,125 @@ public class ListenSMTP extends AbstractProcessor {
         while (!incomingMessages.isEmpty()) {
             SmtpEvent message = incomingMessages.poll();
 
-
             if (message == null) {
                 return;
             }
 
             synchronized (message) {
-
-                FlowFile flowfile = session.create();
-
-                if (message.getMessageData() != null) {
-                    ByteArrayOutputStream messageData = message.getMessageData();
-                    flowfile = session.write(flowfile, new OutputStreamCallback() {
-
-                        // Write the messageData to flowfile content
-                        @Override
-                        public void process(OutputStream out) throws IOException {
-                            out.write(messageData.toByteArray());
-                        }
-                    });
+                if (resultCodeSetAndIsError(message)) {
+                    SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+                    getLogger().warn("Message failed before onTrigger processing message was: " + resultCode.getLogMessage());
+                    continue;
                 }
 
-                HashMap<String, String> attributes = new HashMap<>();
-                // Gather message attributes
-                attributes.put(SMTP_HELO, message.getHelo());
-                attributes.put(SMTP_SRC_IP, message.getHelo());
-                attributes.put(SMTP_FROM, message.getFrom());
-                attributes.put(SMTP_TO, message.getTo());
+                try {
+                    FlowFile flowfile = session.create();
+
+                    if (message.getMessageData() != null) {
+                        flowfile = session.write(flowfile, out -> {
+                            InputStream inputStream = message.getMessageData();
+                            byte [] buffer = new byte[1024];
+
+                            int rd;
+                            long totalBytesRead =0;
+
+                            while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
+                                totalBytesRead += rd;
+                                if (totalBytesRead > server.getMaxMessageSize() ) {
+                                    message.setReturnCode(500);
+                                    message.setProcessed();
+                                    break;
+                                }
+                                out.write(buffer, 0, rd);
+                            }
+                            out.flush();
+                        });
+                    } else {
+                        getLogger().debug("Message body was null");
+                        message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode());
+                        message.setProcessed();
+                    }
+
+                    if (!message.getProcessed()) {
+                        HashMap<String, String> attributes = new HashMap<>();
+                        // Gather message attributes
+                        attributes.put(SMTP_HELO, message.getHelo());
+                        attributes.put(SMTP_SRC_IP, message.getHelo());
+                        attributes.put(SMTP_FROM, message.getFrom());
+                        attributes.put(SMTP_TO, message.getTo());
+
+                        List<Map<String, String>> details = message.getCertifcateDetails();
+                        int c = 0;
+
+                        // Add a selection of each X509 certificates to the already gathered attributes
+
+                        for (Map<String, String> detail : details) {
+                            attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
+                            attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
+                            c++;
+                        }
 
-                List<Map<String, String>> details = message.getCertifcateDetails();
-                int c = 0;
+                        // Set Mime-Type
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
 
-                // Add a selection of each X509 certificates to the already gathered attributes
+                        // Add the attributes. to flowfile
+                        flowfile = session.putAllAttributes(flowfile, attributes);
+                        session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
+                        session.transfer(flowfile, REL_SUCCESS);
 
-                for (Map<String, String> detail : details) {
-                    attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
-                    attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
-                    c++;
+                        getLogger().info("Transferring {} to success", new Object[]{flowfile});
+                    }
+                } catch (Exception e) {
+                    message.setProcessed();
+                    message.setReturnCode(SMTPResultCode.UNEXPECTED_ERROR.getCode());
                 }
 
-                // Set Mime-Type
-                attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
-
-                // Add the attributes. to flowfile
-                flowfile = session.putAllAttributes(flowfile, attributes);
-                session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
-                session.transfer(flowfile, REL_SUCCESS);
-                getLogger().info("Transferring {} to success", new Object[]{flowfile});
+                // Check to see if it failed when creating the FlowFile
+                if (resultCodeSetAndIsError(message)) {
+                    session.rollback();
+                    SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+                    getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage());
+                    message.notifyAll();
+                    continue;
+                }
 
                 // Finished processing,
                 message.setProcessed();
 
-                // update the latch so data() can process the rest of the method
-                message.updateProcessedLatch();
+                // notify on the message so data() can process the rest of the method
+                message.notifyAll();
 
-                // End of synchronized block
-            }
-
-            // Wait for SMTPMessageHandler data() and done() to complete
-            // their side of the work (i.e. acknowledgement)
-            while (!message.getAcknowledged()) {
-                // Busy wait
+                // Wait for data() to tell sender we received the message and double check we didn't timeout
+                final long serverTimeout = context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+                try {
+                    message.wait(serverTimeout);
+                } catch (InterruptedException e) {
+                    getLogger().info("Interrupted while waiting for Message Handler to acknowledge message.");
                 }
 
-            // Lock one last time
-            synchronized (message) {
-                SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
-                switch (resultCode) {
-                    case UNEXPECTED_ERROR:
-                    case TIMEOUT_ERROR:
-                        session.rollback();
-                        getLogger().warn(resultCode.getLogMessage());
-                    case SUCCESS:
-                        getLogger().info(resultCode.getLogMessage());
-                        break;
-                    default:
-                        getLogger().error(resultCode.getLogMessage());
+                // Check to see if the sender was correctly notified
+                if (resultCodeSetAndIsError(message)) {
+                    SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+                    session.rollback();
+                    getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage());
+                } else {
+                    // Need to commit because if we didn't and a following message needed to be rolled back, this message would be too, causing data loss.
+                    session.commit();
                 }
             }
         }
     }
 
+    private boolean resultCodeSetAndIsError(SmtpEvent message){
+        if (message.getReturnCode() != null ) {
+            SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
+            if (resultCode.isError()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     // Same old... same old... used for testing to access the random port that was selected
     protected int getPort() {
         return server == null ? 0 : server.getPort();

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
index eaded4a..e1c36c5 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java
@@ -18,14 +18,13 @@
 package org.apache.nifi.processors.email.smtp.event;
 
 
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
+import java.io.InputStream;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,19 +37,15 @@ public class SmtpEvent{
     private final String helo;
     private final String from;
     private final String to;
-    private final ByteArrayOutputStream messageData;
+    private final InputStream messageData;
     private List<Map<String, String>> certificatesDetails;
     private AtomicBoolean processed = new AtomicBoolean(false);
     private AtomicBoolean acknowledged = new AtomicBoolean(false);
     private AtomicInteger returnCode = new AtomicInteger();
-    private CountDownLatch processedLatch;
 
     public SmtpEvent(
             final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates,
-            final ByteArrayOutputStream messageData,
-            CountDownLatch processedLatch) {
-
-        this.processedLatch = processedLatch;
+            final InputStream messageData) {
 
         this.remoteIP = remoteIP;
         this.helo = helo;
@@ -86,7 +81,7 @@ public class SmtpEvent{
         return helo;
     }
 
-    public synchronized ByteArrayOutputStream getMessageData() {
+    public synchronized InputStream getMessageData() {
         return messageData;
     }
 
@@ -118,15 +113,11 @@ public class SmtpEvent{
         return this.acknowledged.get();
     }
 
-    public synchronized void updateProcessedLatch() {
-        this.processedLatch.countDown();
-    }
-
     public synchronized void setReturnCode(int code) {
         this.returnCode.set(code);
     }
 
-    public synchronized int getReturnCode() {
+    public synchronized Integer getReturnCode() {
         return this.returnCode.get();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
index 0ac4127..6b647bf 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
@@ -20,12 +20,10 @@ package org.apache.nifi.processors.email.smtp.handler;
 import java.io.IOException;
 import java.io.InputStream;
 import java.security.cert.X509Certificate;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.StopWatch;
 import org.subethamail.smtp.DropConnectionException;
 import org.subethamail.smtp.MessageContext;
@@ -33,7 +31,6 @@ import org.subethamail.smtp.MessageHandler;
 import org.subethamail.smtp.MessageHandlerFactory;
 import org.subethamail.smtp.RejectException;
 import org.subethamail.smtp.TooMuchDataException;
-import org.subethamail.smtp.server.SMTPServer;
 
 import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
 
@@ -57,13 +54,9 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
         final MessageContext messageContext;
         String from;
         String recipient;
-        ByteArrayOutputStream messageData;
-
-        private CountDownLatch latch;
 
         public Handler(MessageContext messageContext, LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger){
             this.messageContext = messageContext;
-            this.latch =  new CountDownLatch(1);
         }
 
         @Override
@@ -82,31 +75,9 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
         public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException {
             // Start counting the timer...
             StopWatch watch = new StopWatch(true);
-
             long elapsed;
-
-            SMTPServer server = messageContext.getSMTPServer();
-
             final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS);
 
-            this.messageData = new ByteArrayOutputStream();
-
-            byte [] buffer = new byte[1024];
-
-            int rd;
-
-            while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
-                messageData.write(buffer, 0, rd);
-                if (messageData.getBufferLength() > server.getMaxMessageSize() ) {
-                    // NOTE: Setting processed at this stage is not desirable as message object will only be created
-                    // if this test (i.e. message size) passes.
-                    final SMTPResultCode returnCode = SMTPResultCode.fromCode(500);
-                    logger.warn(returnCode.getLogMessage());
-                    throw new TooMuchDataException(returnCode.getErrorMessage());
-                }
-            }
-            messageData.flush();
-
             X509Certificate[] certificates = new X509Certificate[]{};
 
             final String remoteIP = messageContext.getRemoteAddress().toString();
@@ -116,67 +87,79 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
                 certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone();
             }
 
-            SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, messageData, latch);
+            SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, inputStream);
+
+            synchronized (message) {
+                // / Try to queue the message back to the NiFi session
+                try {
+                    elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
+                    incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
+                    logger.trace(returnCode.getLogMessage());
+
+                    // NOTE: Setting acknowledged at this stage is redundant as this catch deals with the inability of
+                    // adding message to the processing queue. Yet, for the sake of consistency the message is
+                    // updated nonetheless
+                    message.setReturnCode(returnCode.getCode());
+                    message.setAcknowledged();
+                    throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+                }
 
-            // / Try to queue the message back to the NiFi session
-            try {
+                // Once message has been sent to the queue, it should be processed by NiFi onTrigger,
+                // a flowfile created and its processed status updated before an acknowledgment is
+                // given back to the SMTP client
                 elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
-                incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
-                logger.trace(returnCode.getLogMessage());
-
-                // NOTE: Setting processed at this stage is redundant as this catch deals with the inability of
-                // adding message to the processing queue. Yet, for the sake of consistency the message is
-                // updated nonetheless
-                message.setReturnCode(returnCode.getCode());
-                message.setAcknowledged();
-                throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
-            }
-
-            // Once message has been sent to the queue, it should be processed by NiFi onTrigger,
-            // a flowfile created and its processed status updated before an acknowledgment is
-            // given back to the SMTP client
-            elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
-            try {
-                latch.await(serverTimeout - elapsed, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                // Latch open unexpectedly. Will return error and requestonTrigger to rollback
-                logger.trace("Latch opened unexpectedly and processor indicates data wasn't processed. Returned error to SMTP client as precautionary measure");
-                incomingMessages.remove(message);
-
-                // Set the final values so onTrigger can figure out what happened to message
-                final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
-                message.setReturnCode(returnCode.getCode());
-                message.setAcknowledged();
-
-                // Inform client
-                throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
-            }
+                try {
+                    message.wait(serverTimeout - elapsed);
+                } catch (InterruptedException e) {
+                    // Interrupted while waiting for the message to process. Will return error and request onTrigger to rollback
+                    logger.trace("Interrupted while waiting for processor to process data. Returned error to SMTP client as precautionary measure");
+                    incomingMessages.remove(message);
+
+                    // Set the final values so onTrigger can figure out what happened to message
+                    final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
+                    message.setReturnCode(returnCode.getCode());
+                    message.setAcknowledged();
+
+                    // Inform client
+                    throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+                }
 
-            // Remove the message from the queue.
-            incomingMessages.remove(message);
-            // Check if message is processed and if yes, check if it was received on time and wraps it up.
-            elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
-            if (!message.getProcessed() ||  (elapsed >= serverTimeout)) {
-                final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
-                logger.trace("Did not receive the onTrigger response within the acceptable timeframes. Data duplication may have occurred.");
-
-                // Set the final values so onTrigger can figure out what happened to message
-                message.setReturnCode(returnCode.getCode());
-                message.setAcknowledged();
-                throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+                // Check if message is processed
+                if (!message.getProcessed()) {
+                    incomingMessages.remove(message);
+                    final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
+                    logger.trace("Did not receive the onTrigger response within the acceptable timeframe.");
+
+                    // Set the final values so onTrigger can figure out what happened to message
+                    message.setReturnCode(returnCode.getCode());
+                    message.setAcknowledged();
+                    throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+                } else if(message.getReturnCode() != null) {
+                    // No need to check if over server timeout because we already processed the data. Might as well use the status code returned by onTrigger.
+                    final SMTPResultCode returnCode = SMTPResultCode.fromCode(message.getReturnCode());
+
+                    if(returnCode.isError()){
+                        message.setAcknowledged();
+                        throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
+                    }
+                } else {
+                    // onTrigger successfully processed the data.
+                    // No need to check if over server timeout because we already processed the data. Might as well finalize it.
+                    // Set the final values so onTrigger can figure out what happened to message
+                    message.setReturnCode(250);
+                    message.setAcknowledged();
+                }
+                // Exit, allowing Handler to acknowledge the message
+                message.notifyAll();
             }
-
-            // Set the final values so onTrigger can figure out what happened to message
-            message.setReturnCode(250);
-            message.setAcknowledged();
-            // Exit, allowing Handler to acknowledge the message
-    }
+        }
 
         @Override
         public void done() {
             logger.trace("Called the last method of message handler. Exiting");
+            // Notifying the ontrigger that the message was handled.
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
index 5328a0d..b9c0d60 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java
@@ -79,5 +79,15 @@ public enum SMTPResultCode {
         return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode;
     }
 
-
+    public boolean isError(){
+        switch (this) {
+            case MESSAGE_TOO_LARGE:
+            case UNEXPECTED_ERROR:
+            case QUEUE_ERROR:
+            case TIMEOUT_ERROR:
+                return true;
+            default:
+                return false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e224c28/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
index 983ac4a..1fd7628 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java
@@ -32,6 +32,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TestListenSMTP {
 
@@ -251,7 +252,7 @@ public class TestListenSMTP {
         listenSmtp.startShutdown();
     }
 
-    @Test(timeout=15000, expected=EmailException.class)
+    @Test(timeout=15000)
     public void emailTooLarge() throws Exception {
         ListenSMTP listenSmtp = new ListenSMTP();
         final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
@@ -272,31 +273,47 @@ public class TestListenSMTP {
         listenSmtp.initializeSMTPServer(context);
 
         final int port = listenSmtp.getPort();
+        AtomicBoolean finished = new AtomicBoolean(false);;
+        AtomicBoolean failed = new AtomicBoolean(false);
 
-        Email email = new SimpleEmail();
-        email.setHostName("127.0.0.1");
-        email.setSmtpPort(port);
-        email.setStartTLSEnabled(false);
-        email.setFrom("alice@nifi.apache.org");
-        email.setSubject("This is a test");
-        email.setMsg("Test test test chocolate");
-        email.addTo("bob@nifi.apache.org");
-        email.send();
+        try {
+            final Thread clientThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Email email = new SimpleEmail();
+                        email.setHostName("127.0.0.1");
+                        email.setSmtpPort(port);
+                        email.setStartTLSEnabled(false);
+                        email.setFrom("alice@nifi.apache.org");
+                        email.setSubject("This is a test");
+                        email.setMsg("Test test test chocolate");
+                        email.addTo("bob@nifi.apache.org");
+                        email.send();
 
-        Thread.sleep(100);
+                    } catch (final EmailException t) {
+                        failed.set(true);
+                    }
+                    finished.set(true);
+                }
+            });
+            clientThread.start();
 
+            while (!finished.get()) {
+                // process the request.
+                listenSmtp.onTrigger(context, processSessionFactory);
+                Thread.sleep(10);
+            }
+            clientThread.stop();
 
-        // process the request.
-        listenSmtp.onTrigger(context, processSessionFactory);
+            Assert.assertTrue("Sending email succeeded when it should have failed", failed.get());
 
-        runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0);
-        runner.assertQueueEmpty();
+            runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0);
 
-        try {
-                listenSmtp.startShutdown();
-        } catch (InterruptedException e) {
-                e.printStackTrace();
-                Assert.assertFalse(e.toString(), true);
+            runner.assertQueueEmpty();
+        } finally {
+            // shut down the server
+            listenSmtp.startShutdown();
         }
     }
 }
\ No newline at end of file