You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/28 11:03:36 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Socket Connector Sink (#2549)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 94f4600a4 [Feature][Connector-V2] Socket Connector Sink (#2549)
94f4600a4 is described below
commit 94f4600a4e135325be439302fe3a875ebb7f5b30
Author: Bibo <33...@users.noreply.github.com>
AuthorDate: Sun Aug 28 19:03:31 2022 +0800
[Feature][Connector-V2] Socket Connector Sink (#2549)
* [Feature][Connector-V2] Add Socket Connector Sink
* add License
* fix code style
* fix code style
Co-authored-by: 毕博 <bi...@mafengwo.com>
---
docs/en/connector-v2/sink/Socket.md | 89 +++++++++++
plugin-mapping.properties | 2 +
seatunnel-connectors-v2/connector-socket/pom.xml | 6 +
.../seatunnel/socket/config/SinkConfig.java | 43 +++++
.../seatunnel/socket/sink/SocketClient.java | 178 +++++++++++++++++++++
.../seatunnel/socket/sink/SocketSink.java | 74 +++++++++
.../seatunnel/socket/sink/SocketSinkWriter.java | 50 ++++++
7 files changed, 442 insertions(+)
diff --git a/docs/en/connector-v2/sink/Socket.md b/docs/en/connector-v2/sink/Socket.md
new file mode 100644
index 000000000..7339f7b01
--- /dev/null
+++ b/docs/en/connector-v2/sink/Socket.md
@@ -0,0 +1,89 @@
+# Socket
+
+> Socket sink connector
+
+## Description
+
+Used to send data to Socket Server. Both support streaming and batch mode.
+> For example, if the data from upstream is [`age: 12, name: jared`], the content send to socket server is the following: `{"name":"jared","age":17}`
+
+
+## Options
+
+| name | type | required | default value |
+| --- |--------|----------|---------------|
+| host | String | Yes | - |
+| port | Integer | yes | - |
+| max_retries | Integer | No | 3 |
+
+### host [string]
+socket server host
+
+### port [integer]
+
+socket server port
+
+### max_retries [integer]
+
+The number of retries to send record failed
+
+## Example
+
+simple:
+
+```hocon
+Socket {
+ host = "localhost"
+ port = 9999
+ }
+```
+
+test:
+
+* Configuring the SeaTunnel config file
+
+```hocon
+env {
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ sql = "select name, age from fake"
+}
+
+sink {
+ Socket {
+ host = "localhost"
+ port = 9999
+ }
+}
+
+```
+
+* Start a port listening
+
+```shell
+nc -l -v 9999
+```
+
+* Start a SeaTunnel task
+
+
+* Socket Server Console print data
+
+```text
+{"name":"jared","age":17}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 0efd44f1f..7d92053ad 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -120,3 +120,5 @@ seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
seatunnel.sink.Neo4j = connector-neo4j
seatunnel.sink.FtpFile = connector-file-ftp
+seatunnel.sink.Socket = connector-socket
+
diff --git a/seatunnel-connectors-v2/connector-socket/pom.xml b/seatunnel-connectors-v2/connector-socket/pom.xml
index dca9bf524..fab1c5f3e 100644
--- a/seatunnel-connectors-v2/connector-socket/pom.xml
+++ b/seatunnel-connectors-v2/connector-socket/pom.xml
@@ -35,6 +35,12 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
new file mode 100644
index 000000000..9e93336b7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SinkConfig implements Serializable {
+ public static final String HOST = "host";
+ public static final String PORT = "port";
+ private static final String MAX_RETRIES = "max_retries";
+ private static final int DEFAULT_MAX_RETRIES = 3;
+ private String host;
+ private int port;
+ private int maxNumRetries = DEFAULT_MAX_RETRIES;
+
+ public SinkConfig(Config config) {
+ this.host = config.getString(HOST);
+ this.port = config.getInt(PORT);
+ if (config.hasPath(MAX_RETRIES)) {
+ this.maxNumRetries = config.getInt(MAX_RETRIES);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
new file mode 100644
index 000000000..77a38b1c8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.sink;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+@Slf4j
+public class SocketClient {
+
+ private final String hostName;
+ private final int port;
+ private int retries;
+ private final int maxNumRetries;
+ private transient Socket client;
+ private transient OutputStream outputStream;
+ private final SerializationSchema serializationSchema;
+ private volatile boolean isRunning = Boolean.TRUE;
+ private static final int CONNECTION_RETRY_DELAY = 500;
+
+ public SocketClient(SinkConfig config, SerializationSchema serializationSchema) {
+ this.hostName = config.getHost();
+ this.port = config.getPort();
+ this.serializationSchema = serializationSchema;
+ retries = config.getMaxNumRetries();
+ maxNumRetries = config.getMaxNumRetries();
+ }
+
+ private void createConnection() throws IOException {
+ client = new Socket(hostName, port);
+ client.setKeepAlive(true);
+ client.setTcpNoDelay(true);
+
+ outputStream = client.getOutputStream();
+ }
+
+ public void open() throws IOException {
+ try {
+ synchronized (SocketClient.class) {
+ createConnection();
+ }
+ } catch (IOException e) {
+ throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
+ }
+ }
+
+ public void wirte(SeaTunnelRow row) throws IOException {
+ byte[] msg = serializationSchema.serialize(row);
+ try {
+ outputStream.write(msg);
+ outputStream.flush();
+
+ } catch (IOException e) {
+ // if no re-tries are enable, fail immediately
+ if (maxNumRetries == 0) {
+ throw new IOException(
+ "Failed to send message '"
+ + row
+ + "' to socket server at "
+ + hostName
+ + ":"
+ + port
+ + ". Connection re-tries are not enabled.",
+ e);
+ }
+
+ log.error(
+ "Failed to send message '"
+ + row
+ + "' to socket server at "
+ + hostName
+ + ":"
+ + port
+ + ". Trying to reconnect...",
+ e);
+
+ synchronized (SocketClient.class) {
+ IOException lastException = null;
+ retries = 0;
+
+ while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
+
+ // first, clean up the old resources
+ try {
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ } catch (IOException ee) {
+ log.error("Could not close output stream from failed write attempt", ee);
+ }
+ try {
+ if (client != null) {
+ client.close();
+ }
+ } catch (IOException ee) {
+ log.error("Could not close socket from failed write attempt", ee);
+ }
+
+ // try again
+ retries++;
+
+ try {
+ // initialize a new connection
+ createConnection();
+ outputStream.write(msg);
+ return;
+ } catch (IOException ee) {
+ lastException = ee;
+ log.error(
+ "Re-connect to socket server and send message failed. Retry time(s): "
+ + retries,
+ ee);
+ }
+ try {
+ this.wait(CONNECTION_RETRY_DELAY);
+ }
+ catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "unable to write; interrupted while doing another attempt", e);
+ }
+ }
+
+ if (isRunning) {
+ throw new IOException(
+ "Failed to send message '"
+ + row
+ + "' to socket server at "
+ + hostName
+ + ":"
+ + port
+ + ". Failed after "
+ + retries
+ + " retries.",
+ lastException);
+ }
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ isRunning = false;
+ synchronized (this) {
+ this.notifyAll();
+ try {
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
new file mode 100644
index 000000000..3277ef518
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class SocketSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+ private Config pluginConfig;
+ private SinkConfig sinkConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ @Override
+ public String getPluginName() {
+ return "Socket";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, SinkConfig.PORT, SinkConfig.HOST);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+ }
+ sinkConfig = new SinkConfig(pluginConfig);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType getConsumedType() {
+ return seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException {
+ return new SocketSinkWriter(sinkConfig, seaTunnelRowType);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
new file mode 100644
index 000000000..9241c9e57
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.sink;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import java.io.IOException;
+
+public class SocketSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+ private final SocketClient socketClient;
+ private final SerializationSchema serializationSchema;
+ private final SinkConfig sinkConfig;
+
+ SocketSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) throws IOException {
+ this.sinkConfig = sinkConfig;
+ this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
+ this.socketClient = new SocketClient(sinkConfig, serializationSchema);
+ socketClient.open();
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ socketClient.wirte(element);
+ }
+
+ @Override
+ public void close() throws IOException {
+ socketClient.close();
+ }
+}