You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/11/10 22:23:05 UTC

[nifi] branch main updated: NIFI-9308: Added EmailRecordSink

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f05443d  NIFI-9308: Added EmailRecordSink
f05443d is described below

commit f05443ddf7c044f1f718c6fc2dfe1ecc8498718e
Author: Lehel <Le...@hotmail.com>
AuthorDate: Wed Oct 20 16:27:56 2021 +0200

    NIFI-9308: Added EmailRecordSink
    
    This closes #5471
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi-record-sink-service/pom.xml               |   5 +
 .../apache/nifi/record/sink/EmailRecordSink.java   | 368 +++++++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   1 +
 .../nifi/record/sink/TestEmailRecordSink.java      | 147 ++++++++
 4 files changed, 521 insertions(+)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
index 071ee31..6875c71 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
@@ -64,5 +64,10 @@
             <artifactId>nifi-mock-record-utils</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.sun.mail</groupId>
+            <artifactId>jakarta.mail</artifactId>
+            <version>2.0.1</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/EmailRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/EmailRecordSink.java
new file mode 100644
index 0000000..7006272
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/EmailRecordSink.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink;
+
+import jakarta.mail.Authenticator;
+import jakarta.mail.Message;
+import jakarta.mail.MessagingException;
+import jakarta.mail.PasswordAuthentication;
+import jakarta.mail.Session;
+import jakarta.mail.Transport;
+import jakarta.mail.internet.AddressException;
+import jakarta.mail.internet.InternetAddress;
+import jakarta.mail.internet.MimeMessage;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Tags({"email", "smtp", "record", "sink", "send", "write"})
+@CapabilityDescription("Provides a RecordSinkService that can be used to send records in email using the specified writer for formatting.")
+public class EmailRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    private static final String RFC822 = "Comma separated sequence of addresses following RFC822 syntax.";
+
+    public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
+            .name("from")
+            .displayName("From")
+            .description("Specifies the Email address to use as the sender. " + RFC822)
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor TO = new PropertyDescriptor.Builder()
+            .name("to")
+            .displayName("To")
+            .description("The recipients to include in the To-Line of the email. " + RFC822)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CC = new PropertyDescriptor.Builder()
+            .name("cc")
+            .displayName("CC")
+            .description("The recipients to include in the CC-Line of the email. " + RFC822)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor BCC = new PropertyDescriptor.Builder()
+            .name("bcc")
+            .displayName("BCC")
+            .description("The recipients to include in the BCC-Line of the email. " + RFC822)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
+            .name("subject")
+            .displayName("Subject")
+            .description("The email subject")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("Message from NiFi")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
+            .name("smtp-hostname")
+            .displayName("SMTP Hostname")
+            .description("The hostname of the SMTP Server that is used to send Email Notifications")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+    public static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
+            .name("smtp-port")
+            .displayName("SMTP Port")
+            .description("The Port used for SMTP communications")
+            .required(true)
+            .defaultValue("25")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SMTP_AUTH = new PropertyDescriptor.Builder()
+            .name("smtp-auth")
+            .displayName("SMTP Auth")
+            .description("Flag indicating whether authentication should be used")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue("true")
+            .build();
+    public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder()
+            .name("smtp-username")
+            .displayName("SMTP Username")
+            .description("Username for the SMTP account")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .dependsOn(SMTP_AUTH, "true")
+            .build();
+    public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder()
+            .name("smtp-password")
+            .displayName("SMTP Password")
+            .description("Password for the SMTP account")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .dependsOn(SMTP_AUTH, "true")
+            .build();
+    public static final PropertyDescriptor SMTP_STARTTLS = new PropertyDescriptor.Builder()
+            .name("smtp-starttls")
+            .displayName("SMTP STARTTLS")
+            .description("Flag indicating whether STARTTLS should be enabled. "
+                    + "If the server does not support STARTTLS, the connection continues without the use of TLS")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue("false")
+            .build();
+    public static final PropertyDescriptor SMTP_SSL = new PropertyDescriptor.Builder()
+            .name("smtp-ssl")
+            .displayName("SMTP SSL")
+            .description("Flag indicating whether SSL should be enabled")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("false")
+            .build();
+    public static final PropertyDescriptor HEADER_XMAILER = new PropertyDescriptor.Builder()
+            .name("smtp-xmailer-header")
+            .displayName("SMTP X-Mailer Header")
+            .description("X-Mailer used in the header of the outgoing email")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("NiFi")
+            .build();
+
+    private volatile RecordSetWriterFactory writerFactory;
+
+    /**
+     * Mapping of the mail properties to the NiFi PropertyDescriptors that will be evaluated at runtime
+     */
+    private static final Map<String, PropertyDescriptor> propertyToContext = new HashMap<>();
+
+    static {
+        propertyToContext.put("mail.smtp.host", SMTP_HOSTNAME);
+        propertyToContext.put("mail.smtp.port", SMTP_PORT);
+        propertyToContext.put("mail.smtps.port", SMTP_PORT);
+        propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT);
+        propertyToContext.put("mail.smtp.ssl.enable", SMTP_SSL);
+        propertyToContext.put("mail.smtp.auth", SMTP_AUTH);
+        propertyToContext.put("mail.smtp.starttls.enable", SMTP_STARTTLS);
+        propertyToContext.put("mail.smtp.user", SMTP_USERNAME);
+        propertyToContext.put("mail.smtp.password", SMTP_PASSWORD);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.unmodifiableList(Arrays.asList(
+                FROM,
+                TO,
+                CC,
+                BCC,
+                SUBJECT,
+                SMTP_HOSTNAME,
+                SMTP_PORT,
+                SMTP_AUTH,
+                SMTP_USERNAME,
+                SMTP_PASSWORD,
+                SMTP_STARTTLS,
+                SMTP_SSL,
+                HEADER_XMAILER,
+                RecordSinkService.RECORD_WRITER_FACTORY
+        ));
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+        final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
+
+        final String to = context.getProperty(TO).getValue();
+        final String cc = context.getProperty(CC).getValue();
+        final String bcc = context.getProperty(BCC).getValue();
+
+        if (to == null && cc == null && bcc == null) {
+            errors.add(new ValidationResult.Builder().subject("To, CC, BCC").valid(false).explanation("Must specify at least one To/CC/BCC address").build());
+        }
+
+        return errors;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+    }
+
+    @Override
+    public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
+        WriteResult writeResult;
+        try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, attributes)) {
+                writer.beginRecordSet();
+                Record r;
+                while ((r = recordSet.next()) != null) {
+                    writer.write(r);
+                    writer.flush();
+                }
+                writeResult = writer.finishRecordSet();
+                writer.flush();
+                sendMessage(getConfigurationContext(), out.toString());
+            }
+        } catch (SchemaNotFoundException e) {
+            final String errorMessage = String.format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s",
+                    recordSet.getSchema().getSchemaName());
+            throw new ProcessException(errorMessage, e);
+        }
+        return writeResult;
+    }
+
+    /**
+     * Wrapper for static method {@link Transport#send(Message)} to add testability of this class.
+     *
+     * @param msg the message to send
+     * @throws MessagingException on error
+     */
+    protected void send(final Message msg) throws MessagingException {
+        Transport.send(msg);
+    }
+
+    private void sendMessage(final ConfigurationContext context, final String messageText) {
+        final Properties properties = getMailProperties(context);
+        final Session mailSession = createMailSession(properties);
+        final Message message = new MimeMessage(mailSession);
+
+        try {
+            message.setFrom(InternetAddress.parse(context.getProperty(FROM).evaluateAttributeExpressions().getValue())[0]);
+
+            final InternetAddress[] toAddresses = toInternetAddresses(context, TO);
+            message.setRecipients(Message.RecipientType.TO, toAddresses);
+
+            final InternetAddress[] ccAddresses = toInternetAddresses(context, CC);
+            message.setRecipients(Message.RecipientType.CC, ccAddresses);
+
+            final InternetAddress[] bccAddresses = toInternetAddresses(context, BCC);
+            message.setRecipients(Message.RecipientType.BCC, bccAddresses);
+
+            message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions().getValue());
+            message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions().getValue());
+
+            message.setContent(messageText, "text/plain");
+            message.setSentDate(new Date());
+
+            send(message);
+        } catch (final ProcessException | MessagingException e) {
+            final String errorMessage = String.format("Send Failed using SMTP Host [%s] Port [%s]", properties.get("mail.smtp.host"), properties.get("mail.smtp.port"));
+            throw new RuntimeException(errorMessage, e);
+        }
+    }
+
+    /**
+     * Uses the mapping of jakarta.mail properties to NiFi PropertyDescriptors to build the required Properties object to be used for sending this email
+     *
+     * @param context context
+     * @return mail properties
+     */
+    private Properties getMailProperties(final ConfigurationContext context) {
+        final Properties properties = new Properties();
+
+        for (Map.Entry<String, PropertyDescriptor> entry : propertyToContext.entrySet()) {
+            // Evaluate the property descriptor against the variable registry
+            String property = entry.getKey();
+            String propValue = context.getProperty(entry.getValue()).evaluateAttributeExpressions().getValue();
+
+            // Nullable values are not allowed, so filter out
+            if (StringUtils.isNotBlank(propValue)) {
+                properties.setProperty(property, propValue);
+            }
+        }
+
+        return properties;
+    }
+
+    /**
+     * Based on the input properties, determine whether an authenticate or unauthenticated session should be used. If authenticated, creates a Password Authenticator for use in sending the email.
+     *
+     * @param properties mail properties
+     * @return session
+     */
+    private Session createMailSession(final Properties properties) {
+        final boolean auth = Boolean.parseBoolean(properties.getProperty("mail.smtp.auth"));
+        return auth ? Session.getInstance(properties, new Authenticator() {
+            @Override
+            public PasswordAuthentication getPasswordAuthentication() {
+                final String username = properties.getProperty("mail.smtp.user");
+                final String password = properties.getProperty("mail.smtp.password");
+                return new PasswordAuthentication(username, password);
+            }
+        }) : Session.getInstance(properties); // without auth
+    }
+
+    /**
+     * @param context            the current context
+     * @param propertyDescriptor the property to evaluate
+     * @return an InternetAddress[] parsed from the supplied property
+     * @throws AddressException if the property cannot be parsed to a valid InternetAddress[]
+     */
+    private InternetAddress[] toInternetAddresses(final ConfigurationContext context, PropertyDescriptor propertyDescriptor) throws AddressException {
+        InternetAddress[] parse;
+        final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue();
+        if (value == null || value.isEmpty()) {
+            if (propertyDescriptor.isRequired()) {
+                final String exceptionMsg = "Required property '" + propertyDescriptor.getDisplayName() + "' evaluates to an empty string.";
+                throw new AddressException(exceptionMsg);
+            } else {
+                parse = new InternetAddress[0];
+            }
+        } else {
+            try {
+                parse = InternetAddress.parse(value);
+            } catch (AddressException e) {
+                final String exceptionMsg = "Unable to parse a valid address for property '" + propertyDescriptor.getDisplayName() + "' with value '" + value + "'";
+                throw new AddressException(exceptionMsg);
+            }
+        }
+        return parse;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index ec4250c..3297253 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,3 +14,4 @@
 # limitations under the License.
 org.apache.nifi.record.sink.lookup.RecordSinkServiceLookup
 org.apache.nifi.record.sink.LoggingRecordSink
+org.apache.nifi.record.sink.EmailRecordSink
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestEmailRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestEmailRecordSink.java
new file mode 100644
index 0000000..77fc949
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestEmailRecordSink.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink;
+
+import jakarta.mail.Message;
+import jakarta.mail.MessagingException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestEmailRecordSink {
+
+    /**
+     * Extension of EmailRecordSink that stubs out the calls to
+     * Transport.sendMessage().
+     *
+     * <p>
+     * All sent messages are records in a list available via the
+     * {@link #getMessages()} method.</p>
+     * <p> Calling
+     * {@link #setException(MessagingException)} will cause the supplied exception to be
+     * thrown when sendMessage is invoked.
+     * </p>
+     */
+    private static final class EmailRecordSinkExtension extends EmailRecordSink {
+        private MessagingException e;
+        private final ArrayList<Message> messages = new ArrayList<>();
+
+        @Override
+        protected void send(Message msg) throws MessagingException {
+            messages.add(msg);
+            if (this.e != null) {
+                throw e;
+            }
+        }
+
+        void setException(final MessagingException e) {
+            this.e = e;
+        }
+
+        List<Message> getMessages() {
+            return messages;
+        }
+    }
+
+    private EmailRecordSinkExtension recordSink;
+    private RecordSet recordSet;
+
+    @BeforeEach
+    public void setup() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+        runner.setValidateExpressionUsage(false);
+
+        recordSink = new EmailRecordSinkExtension();
+        runner.addControllerService("emailRecordSink", recordSink);
+
+        MockRecordWriter writerFactory = new MockRecordWriter();
+        runner.addControllerService("writer", writerFactory);
+        runner.setProperty(recordSink, EmailRecordSink.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(recordSink, EmailRecordSink.SMTP_HOSTNAME, "localhost");
+        runner.setProperty(recordSink, EmailRecordSink.FROM, "sender@test.com");
+        runner.setProperty(recordSink, EmailRecordSink.TO, "receiver@test.com");
+
+        runner.enableControllerService(writerFactory);
+        runner.enableControllerService(recordSink);
+
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("a", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("b", RecordFieldType.BOOLEAN.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> valueMap1 = new HashMap<>();
+        valueMap1.put("a", "Hello");
+        valueMap1.put("b", true);
+        final Record record1 = new MapRecord(schema, valueMap1);
+
+        final Map<String, Object> valueMap2 = new HashMap<>();
+        valueMap2.put("a", "World");
+        valueMap2.put("b", false);
+        final Record record2 = new MapRecord(schema, valueMap2);
+
+        recordSet = RecordSet.of(schema, record1, record2);
+    }
+
+    @Test
+    public void testRecordIsSentByEmail() throws IOException, MessagingException {
+        final WriteResult writeResult = recordSink.sendData(recordSet, Collections.emptyMap(), false);
+        final List<Message> messages = recordSink.getMessages();
+        final String expectedMessage = "\"Hello\",\"true\"\n\"World\",\"false\"\n";
+        assertNotNull(writeResult);
+        assertEquals(2, writeResult.getRecordCount());
+        assertEquals(1, messages.size());
+        assertEquals(expectedMessage, messages.get(0).getDataHandler().getContent());
+    }
+
+    @Test
+    public void testSendEmailThrowsException() {
+        final String exceptionMessage = "test exception message";
+        recordSink.setException(new MessagingException(exceptionMessage));
+        final RuntimeException runtimeException = assertThrows(RuntimeException.class,
+                () -> recordSink.sendData(recordSet, Collections.emptyMap(), false)
+        );
+        assertTrue(runtimeException.getMessage().contains("localhost"));
+        assertTrue(runtimeException.getMessage().contains("25"));
+        assertEquals(exceptionMessage, runtimeException.getCause().getMessage());
+    }
+}
+