You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/03 04:23:59 UTC

[incubator-seatunnel] branch dev updated: [Connector-V2] Add Email sink connector (#2304)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 96f2a15e4 [Connector-V2] Add Email sink connector (#2304)
96f2a15e4 is described below

commit 96f2a15e4d30db38b24266d5253c7e7b64db22ec
Author: liuyehan <81...@users.noreply.github.com>
AuthorDate: Wed Aug 3 12:23:55 2022 +0800

    [Connector-V2] Add Email sink connector (#2304)
    
    * 0
    
    * Update pom.xml
    
    add kudu dependency
    
    * Update plugin-mapping.properties
    
    add kudu config
    
    * Update pom.xml
    
    add kudu
    
    * add email sink connector
    
    * Delete seatunnel-connectors-v2/connector-email directory
    
    * Update plugin-mapping.properties
    
    * Update plugin-mapping.properties
    
    * Update pom.xml
    
    * Create pom.xml
    
    * [Connector-V2] Add Kudu source and sink connector
    
    * [Connector-V2] Add Kudu source and sink connector
    
    * [Connector-V2] Add Email sink connector
    
    * Update pom.xml
    
    * Update pom.xml
    
    * Delete seatunnel-connectors-v2/connector-kudu directory
    
    * Update pom.xml
    
    * Update plugin-mapping.properties
    
    * Update pom.xml
    
    * [Connector-V2] update codestyle pom.xml
    
    * [Connector-V2] update config
    
    * [Connector-V2] update license
    
    * [Connector-V2] update email package version 1.5.6
    
    * [Connector-V2] fix problem on code review
    
    * [Connector-V2] add Email usage document
    
    * Update Email.md
---
 docs/en/connector-v2/source/Email.md               |  71 +++++++++
 plugin-mapping.properties                          |   1 +
 pom.xml                                            |  10 +-
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 .../{ => connector-email}/pom.xml                  |  50 +++----
 .../seatunnel/email/config/EmailSinkConfig.java    |  71 +++++++++
 .../connectors/seatunnel/email/sink/EmailSink.java |  63 ++++++++
 .../seatunnel/email/sink/EmailSinkWriter.java      | 158 +++++++++++++++++++++
 .../main/resources/fake_to_emailsink_flink.conf    |  63 ++++++++
 seatunnel-connectors-v2/pom.xml                    |   3 +-
 10 files changed, 459 insertions(+), 36 deletions(-)

diff --git a/docs/en/connector-v2/source/Email.md b/docs/en/connector-v2/source/Email.md
new file mode 100644
index 000000000..0ae3706c5
--- /dev/null
+++ b/docs/en/connector-v2/source/Email.md
@@ -0,0 +1,71 @@
+# Email
+
+## Description
+
+Send the data as a file to email.
+
+ The tested email version is 1.5.6.
+
+## Options
+
+| name                     | type    | required | default value |
+|--------------------------|---------|----------|---------------|
+| email_from_address             | string  | yes      | -             |
+| email_to_address               | string  | yes      | -             |
+| email_host               | string  | yes      | -             |
+| email_transport_protocol             | string  | yes      | -             |
+| email_smtp_auth               | string  | yes      | -             |
+| email_authorization_code               | string  | yes      | -             |
+| email_message_headline             | string  | yes      | -             |
+| email_message_content               | string  | yes      | -             |
+
+
+### email_from_address [string]
+
+Sender Email Address .
+
+### email_to_address [string]
+
+Address to receive mail.
+
+### email_host [string]
+
+SMTP server to connect to.
+
+### email_transport_protocol [string]
+
+The protocol to load the session .
+
+### email_smtp_auth [string]
+
+Whether to authenticate the customer.
+
+### email_authorization_code [string]
+
+authorization code,You can obtain the authorization code from the mailbox Settings.
+
+### email_message_headline [string]
+
+The subject line of the entire message.
+
+### email_message_content [string]
+
+The body of the entire message.
+
+
+## Example
+
+```bash
+
+ EmailSink {
+      email_from_address = "xxxxxx@qq.com"
+      email_to_address = "xxxxxx@163.com"
+      email_host="smtp.qq.com"
+      email_transport_protocol="smtp"
+      email_smtp_auth="true"
+      email_authorization_code=""
+      email_message_headline=""
+      email_message_content=""
+   }
+
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 16b9a0882..5bc85ec72 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -102,6 +102,7 @@ seatunnel.sink.Clickhouse = connector-clickhouse
 seatunnel.sink.ClickhouseFile = connector-clickhouse
 seatunnel.source.Jdbc = connector-jdbc
 seatunnel.sink.Jdbc = connector-jdbc
+seatunnel.sink.Email = connector-email
 seatunnel.sink.HdfsFile = connector-file-hadoop
 seatunnel.sink.LocalFile = connector-file-local
 seatunnel.source.Pulsar = connector-pulsar
diff --git a/pom.xml b/pom.xml
index 4834d90e5..39927e48b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
         <mongo-spark.version>2.2.0</mongo-spark.version>
         <spark-redis.version>2.6.0</spark-redis.version>
         <commons-lang3.version>3.4</commons-lang3.version>
+        <email.version>1.5.6</email.version>
         <commons-collections4.version>4.4</commons-collections4.version>
         <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
         <spark.scope>provided</spark.scope>
@@ -263,7 +264,12 @@
                 <artifactId>lz4</artifactId>
                 <version>1.3.0</version>
             </dependency>
-
+            <!--email -->
+            <dependency>
+                <groupId>com.sun.mail</groupId>
+                <artifactId>javax.mail</artifactId>
+                <version>${email.version}</version>
+            </dependency>
             <!--flink-->
             <dependency>
                 <groupId>org.apache.flink</groupId>
@@ -1322,4 +1328,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index e8edcd953..dd8d105bb 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -101,6 +101,11 @@
             <artifactId>connector-dingtalk</artifactId>
             <version>${project.version}</version>
         </dependency>
+          <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-email</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/connector-email/pom.xml
similarity index 53%
copy from seatunnel-connectors-v2/pom.xml
copy to seatunnel-connectors-v2/connector-email/pom.xml
index 325325cf0..80414be3e 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/connector-email/pom.xml
@@ -17,49 +17,33 @@
     limitations under the License.
 
 -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel</artifactId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-    <artifactId>seatunnel-connectors-v2</artifactId>
 
-    <modules>
-        <module>connector-common</module>
-        <module>connector-clickhouse</module>
-        <module>connector-console</module>
-        <module>connector-fake</module>
-        <module>connector-http</module>
-        <module>connector-jdbc</module>
-        <module>connector-kafka</module>
-        <module>connector-pulsar</module>
-        <module>connector-socket</module>
-        <module>connector-hive</module>
-        <module>connector-file</module>
-        <module>connector-hudi</module>
-        <module>connector-assert</module>
-        <module>connector-dingtalk</module>
-    </modules>
+    <artifactId>connector-email</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.mail</groupId>
+            <artifactId>javax.mail</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <configuration>
-                    <skip>true</skip>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
new file mode 100644
index 000000000..f7b69c1ff
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.email.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+import lombok.NonNull;
+
+@Data
+public class EmailSinkConfig {
+
+    private static final String EMAIL_FROM_ADDRESS = "email_from_address";
+    private static final String EMAIL_TO_ADDRESS = "email_to_address";
+    private static final String EMAIL_AUTHORIZATION_CODE = "email_authorization_code";
+    private static final String EMAIL_MESSAGE_HEADLINE = "email_message_headline";
+    private static final String EMAIL_MESSAGE_CONTENT = "email_message_content";
+    private static final String EMAIL_HOST = "email_host";
+    private static final String EMAIL_TRANSPORT_PROTOCOL = "email_transport_protocol";
+    private static final String EMAIL_SMTP_AUTH = "email_smtp_auth";
+    private String emailFromAddress;
+    private String emailToAddress;
+    private String emailAuthorizationCode;
+    private String emailMessageHeadline;
+    private String emailMessageContent;
+    private String emailHost;
+    private String emailTransportProtocol;
+    private String emailSmtpAuth;
+
+    public EmailSinkConfig(@NonNull Config pluginConfig) {
+        if (pluginConfig.hasPath(EMAIL_FROM_ADDRESS)) {
+            this.emailFromAddress = pluginConfig.getString(EMAIL_FROM_ADDRESS);
+        }
+        if (pluginConfig.hasPath(EMAIL_TO_ADDRESS)) {
+            this.emailToAddress = pluginConfig.getString(EMAIL_TO_ADDRESS);
+        }
+        if (pluginConfig.hasPath(EMAIL_AUTHORIZATION_CODE)) {
+            this.emailAuthorizationCode = pluginConfig.getString(EMAIL_AUTHORIZATION_CODE);
+        }
+        if (pluginConfig.hasPath(EMAIL_MESSAGE_HEADLINE)) {
+            this.emailMessageHeadline = pluginConfig.getString(EMAIL_MESSAGE_HEADLINE);
+        }
+        if (pluginConfig.hasPath(EMAIL_MESSAGE_CONTENT)) {
+            this.emailMessageContent = pluginConfig.getString(EMAIL_MESSAGE_CONTENT);
+        }
+        if (pluginConfig.hasPath(EMAIL_HOST)) {
+            this.emailHost = pluginConfig.getString(EMAIL_HOST);
+        }
+        if (pluginConfig.hasPath(EMAIL_TRANSPORT_PROTOCOL)) {
+            this.emailTransportProtocol = pluginConfig.getString(EMAIL_TRANSPORT_PROTOCOL);
+        }
+        if (pluginConfig.hasPath(EMAIL_SMTP_AUTH)) {
+            this.emailSmtpAuth = pluginConfig.getString(EMAIL_SMTP_AUTH);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
new file mode 100644
index 000000000..bd4860542
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.email.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config pluginConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
+        return new EmailSinkWriter(seaTunnelRowType, pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "EmailSink";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) {
+        this.pluginConfig = pluginConfig;
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
new file mode 100644
index 000000000..1a86dce22
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.email.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.sun.mail.util.MailSSLSocketFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.activation.FileDataSource;
+import javax.mail.Authenticator;
+import javax.mail.BodyPart;
+import javax.mail.Message;
+import javax.mail.Multipart;
+import javax.mail.PasswordAuthentication;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeBodyPart;
+import javax.mail.internet.MimeMessage;
+import javax.mail.internet.MimeMultipart;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+
+public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmailSinkWriter.class);
+
+    private final SeaTunnelRowType seaTunnelRowType;
+    private EmailSinkConfig config;
+    private StringBuffer stringBuffer;
+
+    public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.config = new EmailSinkConfig(pluginConfig);
+        this.stringBuffer = new StringBuffer();
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) {
+        Object[] fields = element.getFields();
+
+        for (Object field : fields) {
+            stringBuffer.append(field.toString() + ",");
+        }
+        stringBuffer.deleteCharAt(fields.length - 1);
+        stringBuffer.append("\n");
+
+    }
+
+    @Override
+    public void close() {
+        createFile();
+        Properties properties = new Properties();
+
+        properties.setProperty("mail.host", config.getEmailHost());
+
+        properties.setProperty("mail.transport.protocol", config.getEmailTransportProtocol());
+
+        properties.setProperty("mail.smtp.auth", config.getEmailSmtpAuth());
+
+        try {
+            MailSSLSocketFactory sf = new MailSSLSocketFactory();
+            sf.setTrustAllHosts(true);
+            properties.put("mail.smtp.ssl.enable", "true");
+            properties.put("mail.smtp.ssl.socketFactory", sf);
+            Session session = Session.getDefaultInstance(properties, new Authenticator() {
+                @Override
+                protected PasswordAuthentication getPasswordAuthentication() {
+                    return new PasswordAuthentication(config.getEmailFromAddress(), config.getEmailAuthorizationCode());
+                }
+            });
+            //Create the default MimeMessage object
+            MimeMessage message = new MimeMessage(session);
+
+            // Set the email address
+            message.setFrom(new InternetAddress(config.getEmailFromAddress()));
+
+            // Set the recipient email address
+            message.addRecipient(Message.RecipientType.TO,
+                    new InternetAddress(config.getEmailToAddress()));
+
+            // Setting the Email subject
+            message.setSubject(config.getEmailMessageHeadline());
+
+            // Create Message
+            BodyPart messageBodyPart = new MimeBodyPart();
+
+            // Set Message content
+            messageBodyPart.setText(config.getEmailMessageContent());
+
+            // Create multiple messages
+            Multipart multipart = new MimeMultipart();
+            // Set up the text message section
+            multipart.addBodyPart(messageBodyPart);
+            // accessory
+            messageBodyPart = new MimeBodyPart();
+            String filename = "emailsink.csv";
+            DataSource source = new FileDataSource(filename);
+            messageBodyPart.setDataHandler(new DataHandler(source));
+            messageBodyPart.setFileName(filename);
+            multipart.addBodyPart(messageBodyPart);
+            message.setContent(multipart);
+
+            //   send a message
+            Transport.send(message);
+            LOGGER.info("Sent message successfully....");
+        } catch (Exception e) {
+            LOGGER.warn("send email Fail.", e);
+            throw new RuntimeException("send email Fail.", e);
+        }
+    }
+
+    public void createFile() {
+        try {
+            String data = stringBuffer.toString();
+            File file = new File("emailsink.csv");
+            //if file doesnt exists, then create it
+            if (!file.exists()) {
+                file.createNewFile();
+            }
+            FileWriter fileWritter = new FileWriter(file.getName());
+            fileWritter.write(data);
+            fileWritter.close();
+            LOGGER.info("Create File successfully....");
+        } catch (IOException e) {
+            LOGGER.warn("Create File Fail.", e);
+            throw new RuntimeException("Create File Fail.", e);
+        }
+
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-email/src/main/resources/fake_to_emailsink_flink.conf b/seatunnel-connectors-v2/connector-email/src/main/resources/fake_to_emailsink_flink.conf
new file mode 100644
index 000000000..8655a11d3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-email/src/main/resources/fake_to_emailsink_flink.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+       result_table_name = "fake"
+       field_name = "name,age"
+     }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+         sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ EmailSink {
+      email_from_address = "xxxxxx@qq.com"
+      email_to_address = "xxxxxx@163.com"
+      email_host="smtp.qq.com"
+      email_transport_protocol="smtp"
+      email_smtp_auth="true"
+      email_authorization_code=""
+      email_message_headline="这个是标题"
+      email_message_content="这个是内容"
+   }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 325325cf0..b5ed52de7 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -44,6 +44,7 @@
         <module>connector-file</module>
         <module>connector-hudi</module>
         <module>connector-assert</module>
+        <module>connector-email</module>
         <module>connector-dingtalk</module>
     </modules>
 
@@ -62,4 +63,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>