You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/23 12:32:14 UTC
[flink] branch release-1.11 updated: [FLINK-18380][examples-table]
Add a ChangelogSocketExample
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 18cd958 [FLINK-18380][examples-table] Add a ChangelogSocketExample
18cd958 is described below
commit 18cd958b67fffeb3acc4ecf4cba712733bdb554a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jun 19 15:03:07 2020 +0200
[FLINK-18380][examples-table] Add a ChangelogSocketExample
---
flink-examples/flink-examples-table/pom.xml | 40 ++++++--
.../java/{ => basics}/StreamSQLExample.java | 2 +-
.../java/{ => basics}/StreamWindowSQLExample.java | 2 +-
.../examples/java/{ => basics}/WordCountSQL.java | 2 +-
.../examples/java/{ => basics}/WordCountTable.java | 2 +-
.../java/connectors/ChangelogCsvDeserializer.java | 98 +++++++++++++++++++
.../java/connectors/ChangelogCsvFormat.java | 71 ++++++++++++++
.../java/connectors/ChangelogCsvFormatFactory.java | 76 +++++++++++++++
.../java/connectors/ChangelogSocketExample.java | 96 ++++++++++++++++++
.../java/connectors/SocketDynamicTableFactory.java | 104 ++++++++++++++++++++
.../java/connectors/SocketDynamicTableSource.java | 90 +++++++++++++++++
.../java/connectors/SocketSourceFunction.java | 107 +++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 17 ++++
13 files changed, 695 insertions(+), 12 deletions(-)
diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index 582900e..cd4ebcf 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -110,12 +110,12 @@ under the License.
<archive>
<manifestEntries>
- <program-class>org.apache.flink.table.examples.java.StreamSQLExample</program-class>
+ <program-class>org.apache.flink.table.examples.java.basics.StreamSQLExample</program-class>
</manifestEntries>
</archive>
<includes>
- <include>org/apache/flink/table/examples/java/StreamSQLExample*</include>
+ <include>org/apache/flink/table/examples/java/basics/StreamSQLExample*</include>
</includes>
</configuration>
</execution>
@@ -132,12 +132,12 @@ under the License.
<archive>
<manifestEntries>
- <program-class>org.apache.flink.table.examples.java.StreamWindowSQLExample</program-class>
+ <program-class>org.apache.flink.table.examples.java.basics.StreamWindowSQLExample</program-class>
</manifestEntries>
</archive>
<includes>
- <include>org/apache/flink/table/examples/java/StreamWindowSQLExample*</include>
+ <include>org/apache/flink/table/examples/java/basics/StreamWindowSQLExample*</include>
</includes>
</configuration>
</execution>
@@ -154,12 +154,12 @@ under the License.
<archive>
<manifestEntries>
- <program-class>org.apache.flink.table.examples.java.WordCountSQL</program-class>
+ <program-class>org.apache.flink.table.examples.java.basics.WordCountSQL</program-class>
</manifestEntries>
</archive>
<includes>
- <include>org/apache/flink/table/examples/java/WordCountSQL*</include>
+ <include>org/apache/flink/table/examples/java/basics/WordCountSQL*</include>
</includes>
</configuration>
</execution>
@@ -176,12 +176,35 @@ under the License.
<archive>
<manifestEntries>
- <program-class>org.apache.flink.table.examples.java.WordCountTable</program-class>
+ <program-class>org.apache.flink.table.examples.java.basics.WordCountTable</program-class>
</manifestEntries>
</archive>
<includes>
- <include>org/apache/flink/table/examples/java/WordCountTable*</include>
+ <include>org/apache/flink/table/examples/java/basics/WordCountTable*</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <execution>
+ <id>ChangelogSocketExample</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+
+ <configuration>
+ <classifier>ChangelogSocketExample</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.table.examples.java.connectors.ChangelogSocketExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/table/examples/java/connectors/ChangelogSocketExample*</include>
+ <include>**/META-INF/services/*</include>
</includes>
</configuration>
</execution>
@@ -247,6 +270,7 @@ under the License.
<copy file="${project.basedir}/target/flink-examples-table_${scala.binary.version}-${project.version}-WordCountTable.jar" tofile="${project.basedir}/target/WordCountTable.jar"/>
<copy file="${project.basedir}/target/flink-examples-table_${scala.binary.version}-${project.version}-StreamTableExample.jar" tofile="${project.basedir}/target/StreamTableExample.jar"/>
<copy file="${project.basedir}/target/flink-examples-table_${scala.binary.version}-${project.version}-TPCHQuery3Table.jar" tofile="${project.basedir}/target/TPCHQuery3Table.jar"/>
+ <copy file="${project.basedir}/target/flink-examples-table_${scala.binary.version}-${project.version}-ChangelogSocketExample.jar" tofile="${project.basedir}/target/ChangelogSocketExample.jar"/>
</target>
</configuration>
</execution>
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
similarity index 98%
rename from flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java
rename to flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
index 7c71a44..1d40f15 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.examples.java;
+package org.apache.flink.table.examples.java.basics;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamWindowSQLExample.java
similarity index 98%
rename from flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java
rename to flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamWindowSQLExample.java
index 0e8d2db..3daf236 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamWindowSQLExample.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.examples.java;
+package org.apache.flink.table.examples.java.basics;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountSQL.java
similarity index 98%
rename from flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
rename to flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountSQL.java
index 1851fffb..3046ff5 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountSQL.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.examples.java;
+package org.apache.flink.table.examples.java.basics;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountTable.java
similarity index 98%
rename from flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java
rename to flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountTable.java
index 385da2e..8bb5077 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountTable.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.examples.java;
+package org.apache.flink.table.examples.java.basics;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvDeserializer.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvDeserializer.java
new file mode 100644
index 0000000..90a91d5
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvDeserializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.RuntimeConverter.Context;
+import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * The {@link ChangelogCsvDeserializer} contains a simple parsing logic for converting bytes into
+ * {@link Row} of {@link Integer} and {@link String} with a {@link RowKind}.
+ *
+ * <p>The final conversion step converts those into internal data structures.
+ */
+public final class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {
+
+ private final List<LogicalType> parsingTypes;
+ private final DataStructureConverter converter;
+ private final TypeInformation<RowData> producedTypeInfo;
+ private final String columnDelimiter;
+
+ public ChangelogCsvDeserializer(
+ List<LogicalType> parsingTypes,
+ DataStructureConverter converter,
+ TypeInformation<RowData> producedTypeInfo,
+ String columnDelimiter) {
+ this.parsingTypes = parsingTypes;
+ this.converter = converter;
+ this.producedTypeInfo = producedTypeInfo;
+ this.columnDelimiter = columnDelimiter;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ // return the type information required by Flink's core interfaces
+ return producedTypeInfo;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ // converters must be opened
+ converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
+ }
+
+ @Override
+ public RowData deserialize(byte[] message) {
+ // parse the columns including a changelog flag
+ final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
+ final RowKind kind = RowKind.valueOf(columns[0]);
+ final Row row = new Row(kind, parsingTypes.size());
+ for (int i = 0; i < parsingTypes.size(); i++) {
+ row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
+ }
+ // convert to internal data structure
+ return (RowData) converter.toInternal(row);
+ }
+
+ private static Object parse(LogicalTypeRoot root, String value) {
+ switch (root) {
+ case INTEGER:
+ return Integer.parseInt(value);
+ case VARCHAR:
+ return value;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java
new file mode 100644
index 0000000..c8c8689
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+
+/**
+ * The {@link ChangelogCsvFormat} is a decoding format that uses a {@link DeserializationSchema} during
+ * runtime. It supports emitting {@code INSERT} and {@code DELETE} changes.
+ */
+public final class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+ private final String columnDelimiter;
+
+ public ChangelogCsvFormat(String columnDelimiter) {
+ this.columnDelimiter = columnDelimiter;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context, DataType producedDataType) {
+ // create type information for the DeserializationSchema
+ final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+
+ // most of the code in DeserializationSchema will not work on internal data structures
+ // create a converter for conversion at the end
+ final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
+
+ // use logical types during runtime for parsing
+ final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();
+
+ // create runtime class
+ return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ // define that this format can produce INSERT and DELETE rows
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormatFactory.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormatFactory.java
new file mode 100644
index 0000000..a61836a
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormatFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The {@link ChangelogCsvFormatFactory} translates format-specific options to a format.
+ *
+ * <p>The {@link FactoryUtil} in {@link SocketDynamicTableFactory} takes care of adapting the option
+ * keys accordingly and handles the prefixing like {@code changelog-csv.column-delimiter}.
+ *
+ * <p>Because this factory implements {@link DeserializationFormatFactory}, it could also be used for
+ * other connectors that support deserialization formats such as the Kafka connector.
+ */
+public final class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
+
+ // define all options statically
+ public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
+ .stringType()
+ .defaultValue("|");
+
+ @Override
+ public String factoryIdentifier() {
+ return "changelog-csv";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(COLUMN_DELIMITER);
+ return options;
+ }
+
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ // get the validated options
+ final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);
+
+ // create and return the format
+ return new ChangelogCsvFormat(columnDelimiter);
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java
new file mode 100644
index 0000000..dae9197
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.Row;
+
+/**
+ * Example for implementing a custom {@link DynamicTableSource} and a {@link DecodingFormat}.
+ *
+ * <p>The example implements a table source with a decoding format that supports changelog semantics.
+ *
+ * <p>The {@link SocketDynamicTableFactory} illustrates how connector components play together. It can
+ * serve as a reference implementation for implementing own connectors and/or formats.
+ *
+ * <p>The {@link SocketDynamicTableSource} uses a simple single-threaded {@link SourceFunction} to open
+ * a socket that listens for incoming bytes. The raw bytes are decoded into rows by a pluggable format.
+ * The format expects a changelog flag as the first column.
+ *
+ * <p>In particular, the example shows how to
+ * <ul>
+ * <li>create factories that parse and validate options,
+ * <li>implement table connectors,
+ * <li>implement and discover custom formats,
+ * <li>and use provided utilities such as data structure converters and the {@link FactoryUtil}.
+ * </ul>
+ *
+ * <p>Usage: <code>ChangelogSocketExample --hostname <localhost> --port <9999></code>
+ *
+ * <p>Use the following command to ingest data in a terminal:
+ * <pre>
+ * nc -lk 9999
+ * INSERT|Alice|12
+ * INSERT|Bob|5
+ * DELETE|Alice|12
+ * INSERT|Alice|18
+ * </pre>
+ *
+ * <p>The result is written to stdout.
+ */
+public final class ChangelogSocketExample {
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool params = ParameterTool.fromArgs(args);
+ final String hostname = params.get("hostname", "localhost");
+ final String port = params.get("port", "9999");
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1); // source only supports parallelism of 1
+
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // register a table in the catalog
+ tEnv.executeSql(
+ "CREATE TABLE UserScores (name STRING, score INT)\n" +
+ "WITH (\n" +
+ " 'connector' = 'socket',\n" +
+ " 'hostname' = '" + hostname + "',\n" +
+ " 'port' = '" + port + "',\n" +
+ " 'byte-delimiter' = '10',\n" +
+ " 'format' = 'changelog-csv',\n" +
+ " 'changelog-csv.column-delimiter' = '|'\n" +
+ ")");
+
+ // define a dynamic aggregating query
+ final Table result = tEnv.sqlQuery("SELECT name, SUM(score) FROM UserScores GROUP BY name");
+
+ // print the result to the console
+ tEnv.toRetractStream(result, Row.class).print();
+
+ env.execute();
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
new file mode 100644
index 0000000..5a61d29
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The {@link SocketDynamicTableFactory} translates the catalog table to a table source.
+ *
+ * <p>Because the table source requires a decoding format, we are discovering the format using the
+ * provided {@link FactoryUtil} for convenience.
+ */
+public final class SocketDynamicTableFactory implements DynamicTableSourceFactory {
+
+ // define all options statically
+ public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
+ .intType()
+ .noDefaultValue();
+
+ public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
+ .intType()
+ .defaultValue(10); // corresponds to '\n'
+
+ @Override
+ public String factoryIdentifier() {
+ return "socket"; // used for matching to `connector = '...'`
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOSTNAME);
+ options.add(PORT);
+ options.add(FactoryUtil.FORMAT); // use pre-defined option for format
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(BYTE_DELIMITER);
+ return options;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ // either implement your custom validation logic here ...
+ // or use the provided helper utility
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ // discover a suitable decoding format
+ final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class,
+ FactoryUtil.FORMAT);
+
+ // validate all options
+ helper.validate();
+
+ // get the validated options
+ final ReadableConfig options = helper.getOptions();
+ final String hostname = options.get(HOSTNAME);
+ final int port = options.get(PORT);
+ final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
+
+ // derive the produced data type (excluding computed columns) from the catalog table
+ final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+ // create and return dynamic table source
+ return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableSource.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableSource.java
new file mode 100644
index 0000000..102e4bd
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * The {@link SocketDynamicTableSource} is used during planning.
+ *
+ * <p>In our example, we don't implement any of the available ability interfaces such as {@link SupportsFilterPushDown}
+ * or {@link SupportsProjectionPushDown}. Therefore, the main logic can be found in {@link #getScanRuntimeProvider(ScanContext)}
+ * where we instantiate the required {@link SourceFunction} and its {@link DeserializationSchema} for
+ * runtime. Both instances are parameterized to return internal data structures (i.e. {@link RowData}).
+ */
+public final class SocketDynamicTableSource implements ScanTableSource {
+
+ private final String hostname;
+ private final int port;
+ private final byte byteDelimiter;
+ private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
+ private final DataType producedDataType;
+
+ public SocketDynamicTableSource(
+ String hostname,
+ int port,
+ byte byteDelimiter,
+ DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
+ DataType producedDataType) {
+ this.hostname = hostname;
+ this.port = port;
+ this.byteDelimiter = byteDelimiter;
+ this.decodingFormat = decodingFormat;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ // in our example the format decides about the changelog mode
+ // but it could also be the source itself
+ return decodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+
+ // create runtime classes that are shipped to the cluster
+
+ final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(runtimeProviderContext, producedDataType);
+
+ final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(hostname, port, byteDelimiter, deserializer);
+
+ return SourceFunctionProvider.of(sourceFunction, false);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Socket Table Source";
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSourceFunction.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSourceFunction.java
new file mode 100644
index 0000000..c21e6f7
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSourceFunction.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+/**
+ * The {@link SocketSourceFunction} opens a socket and consumes bytes.
+ *
+ * <p>It splits records by the given byte delimiter (`\n` by default) and delegates the decoding to a
+ * pluggable {@link DeserializationSchema}.
+ *
+ * <p>Note: This is only an example and should not be used in production. The source function is not
+ * fault-tolerant and can only work with a parallelism of 1.
+ */
+public final class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
+
+ private final String hostname;
+ private final int port;
+ private final byte byteDelimiter;
+ private final DeserializationSchema<RowData> deserializer;
+
+ private volatile boolean isRunning = true;
+ private Socket currentSocket;
+
+ public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
+ this.hostname = hostname;
+ this.port = port;
+ this.byteDelimiter = byteDelimiter;
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return deserializer.getProducedType();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ deserializer.open(() -> getRuntimeContext().getMetricGroup());
+ }
+
+ @Override
+ public void run(SourceContext<RowData> ctx) throws Exception {
+ while (isRunning) {
+ // open and consume from socket
+ try (final Socket socket = new Socket()) {
+ currentSocket = socket;
+ socket.connect(new InetSocketAddress(hostname, port), 0);
+ try (InputStream stream = socket.getInputStream()) {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ int b;
+ while ((b = stream.read()) >= 0) {
+ // buffer until delimiter
+ if (b != byteDelimiter) {
+ buffer.write(b);
+ }
+ // decode and emit record
+ else {
+ ctx.collect(deserializer.deserialize(buffer.toByteArray()));
+ buffer.reset();
+ }
+ }
+ }
+ } catch (Throwable t) {
+ t.printStackTrace(); // print and continue
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ try {
+ currentSocket.close();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+}
diff --git a/flink-examples/flink-examples-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-examples/flink-examples-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..d101135
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.examples.java.connectors.SocketDynamicTableFactory
+org.apache.flink.table.examples.java.connectors.ChangelogCsvFormatFactory