You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/28 11:03:36 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Socket Connector Sink (#2549)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 94f4600a4 [Feature][Connector-V2] Socket Connector Sink (#2549)
94f4600a4 is described below

commit 94f4600a4e135325be439302fe3a875ebb7f5b30
Author: Bibo <33...@users.noreply.github.com>
AuthorDate: Sun Aug 28 19:03:31 2022 +0800

    [Feature][Connector-V2] Socket Connector Sink (#2549)
    
    * [Feature][Connector-V2] Add Socket Connector Sink
    
    * add License
    
    * fix code style
    
    * fix code style
    
    Co-authored-by: 毕博 <bi...@mafengwo.com>
---
 docs/en/connector-v2/sink/Socket.md                |  89 +++++++++++
 plugin-mapping.properties                          |   2 +
 seatunnel-connectors-v2/connector-socket/pom.xml   |   6 +
 .../seatunnel/socket/config/SinkConfig.java        |  43 +++++
 .../seatunnel/socket/sink/SocketClient.java        | 178 +++++++++++++++++++++
 .../seatunnel/socket/sink/SocketSink.java          |  74 +++++++++
 .../seatunnel/socket/sink/SocketSinkWriter.java    |  50 ++++++
 7 files changed, 442 insertions(+)

diff --git a/docs/en/connector-v2/sink/Socket.md b/docs/en/connector-v2/sink/Socket.md
new file mode 100644
index 000000000..7339f7b01
--- /dev/null
+++ b/docs/en/connector-v2/sink/Socket.md
@@ -0,0 +1,89 @@
+# Socket
+
+> Socket sink connector
+
+## Description
+
+Used to send data to Socket Server. Both support streaming and batch mode.
+> For example, if the data from upstream is [`age: 12, name: jared`], the content send to socket server is the following: `{"name":"jared","age":17}`
+
+
+##  Options
+
+| name | type   | required | default value |
+| --- |--------|----------|---------------|
+| host | String | Yes       | -             |
+| port | Integer | yes      | -             |
+| max_retries | Integer | No       | 3             |
+
+### host [string]
+socket server host
+
+### port [integer]
+
+socket server port
+
+### max_retries [integer]
+
+The number of retries to send record failed
+
+## Example
+
+simple:
+
+```hocon
+Socket {
+        host = "localhost"
+        port = 9999
+    }
+```
+
+test:
+
+* Configuring the SeaTunnel config file
+
+```hocon
+env {
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+    FakeSource {
+      result_table_name = "fake"
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+    }
+}
+
+transform {
+      sql = "select name, age from fake"
+}
+
+sink {
+    Socket {
+        host = "localhost"
+        port = 9999
+    }
+}
+
+```
+
+* Start a port listening
+
+```shell
+nc -l -v 9999
+```
+
+* Start a SeaTunnel task
+
+
+* Socket Server Console print data
+
+```text
+{"name":"jared","age":17}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 0efd44f1f..7d92053ad 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -120,3 +120,5 @@ seatunnel.source.IoTDB = connector-iotdb
 seatunnel.sink.IoTDB = connector-iotdb
 seatunnel.sink.Neo4j = connector-neo4j
 seatunnel.sink.FtpFile = connector-file-ftp
+seatunnel.sink.Socket = connector-socket
+
diff --git a/seatunnel-connectors-v2/connector-socket/pom.xml b/seatunnel-connectors-v2/connector-socket/pom.xml
index dca9bf524..fab1c5f3e 100644
--- a/seatunnel-connectors-v2/connector-socket/pom.xml
+++ b/seatunnel-connectors-v2/connector-socket/pom.xml
@@ -35,6 +35,12 @@
             <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
new file mode 100644
index 000000000..9e93336b7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SinkConfig implements Serializable {
+    public static final String HOST = "host";
+    public static final String PORT = "port";
+    private static final String MAX_RETRIES = "max_retries";
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private String host;
+    private int port;
+    private int maxNumRetries = DEFAULT_MAX_RETRIES;
+
+    public SinkConfig(Config config) {
+        this.host = config.getString(HOST);
+        this.port = config.getInt(PORT);
+        if (config.hasPath(MAX_RETRIES)) {
+            this.maxNumRetries = config.getInt(MAX_RETRIES);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
new file mode 100644
index 000000000..77a38b1c8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.sink;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+@Slf4j
+public class SocketClient {
+
+    private final String hostName;
+    private final int port;
+    private int retries;
+    private final int maxNumRetries;
+    private transient Socket client;
+    private transient OutputStream outputStream;
+    private final SerializationSchema serializationSchema;
+    private volatile boolean isRunning = Boolean.TRUE;
+    private static final int CONNECTION_RETRY_DELAY = 500;
+
+    public SocketClient(SinkConfig config, SerializationSchema serializationSchema) {
+        this.hostName = config.getHost();
+        this.port = config.getPort();
+        this.serializationSchema = serializationSchema;
+        retries = config.getMaxNumRetries();
+        maxNumRetries = config.getMaxNumRetries();
+    }
+
+    private void createConnection() throws IOException {
+        client = new Socket(hostName, port);
+        client.setKeepAlive(true);
+        client.setTcpNoDelay(true);
+
+        outputStream = client.getOutputStream();
+    }
+
+    public void open() throws IOException {
+        try {
+            synchronized (SocketClient.class) {
+                createConnection();
+            }
+        } catch (IOException e) {
+            throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
+        }
+    }
+
+    public void wirte(SeaTunnelRow row) throws IOException {
+        byte[] msg = serializationSchema.serialize(row);
+        try {
+            outputStream.write(msg);
+            outputStream.flush();
+
+        } catch (IOException e) {
+            // if no re-tries are enable, fail immediately
+            if (maxNumRetries == 0) {
+                throw new IOException(
+                        "Failed to send message '"
+                                + row
+                                + "' to socket server at "
+                                + hostName
+                                + ":"
+                                + port
+                                + ". Connection re-tries are not enabled.",
+                        e);
+            }
+
+            log.error(
+                    "Failed to send message '"
+                            + row
+                            + "' to socket server at "
+                            + hostName
+                            + ":"
+                            + port
+                            + ". Trying to reconnect...",
+                    e);
+
+            synchronized (SocketClient.class) {
+                IOException lastException = null;
+                retries = 0;
+
+                while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
+
+                    // first, clean up the old resources
+                    try {
+                        if (outputStream != null) {
+                            outputStream.close();
+                        }
+                    } catch (IOException ee) {
+                        log.error("Could not close output stream from failed write attempt", ee);
+                    }
+                    try {
+                        if (client != null) {
+                            client.close();
+                        }
+                    } catch (IOException ee) {
+                        log.error("Could not close socket from failed write attempt", ee);
+                    }
+
+                    // try again
+                    retries++;
+
+                    try {
+                        // initialize a new connection
+                        createConnection();
+                        outputStream.write(msg);
+                        return;
+                    } catch (IOException ee) {
+                        lastException = ee;
+                        log.error(
+                                "Re-connect to socket server and send message failed. Retry time(s): "
+                                        + retries,
+                                ee);
+                    }
+                    try {
+                        this.wait(CONNECTION_RETRY_DELAY);
+                    }
+                    catch (InterruptedException ex) {
+                        Thread.currentThread().interrupt();
+                        throw new IOException(
+                                "unable to write; interrupted while doing another attempt", e);
+                    }
+                }
+
+                if (isRunning) {
+                    throw new IOException(
+                            "Failed to send message '"
+                                    + row
+                                    + "' to socket server at "
+                                    + hostName
+                                    + ":"
+                                    + port
+                                    + ". Failed after "
+                                    + retries
+                                    + " retries.",
+                            lastException);
+                }
+            }
+        }
+    }
+
+    public void close() throws IOException {
+        isRunning = false;
+        synchronized (this) {
+            this.notifyAll();
+            try {
+                if (outputStream != null) {
+                    outputStream.close();
+                }
+            } finally {
+                if (client != null) {
+                    client.close();
+                }
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
new file mode 100644
index 000000000..3277ef518
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class SocketSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+    private Config pluginConfig;
+    private SinkConfig sinkConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public String getPluginName() {
+        return "Socket";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, SinkConfig.PORT, SinkConfig.HOST);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        }
+        sinkConfig = new SinkConfig(pluginConfig);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType getConsumedType() {
+        return seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException {
+        return new SocketSinkWriter(sinkConfig, seaTunnelRowType);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
new file mode 100644
index 000000000..9241c9e57
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.sink;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import java.io.IOException;
+
+public class SocketSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+    private final SocketClient socketClient;
+    private final SerializationSchema serializationSchema;
+    private final SinkConfig sinkConfig;
+
+    SocketSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) throws IOException {
+        this.sinkConfig = sinkConfig;
+        this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
+        this.socketClient = new SocketClient(sinkConfig, serializationSchema);
+        socketClient.open();
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        socketClient.wirte(element);
+    }
+
+    @Override
+    public void close() throws IOException {
+        socketClient.close();
+    }
+}