You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/23 12:32:14 UTC

[flink] branch release-1.11 updated: [FLINK-18380][examples-table] Add a ChangelogSocketExample

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 18cd958  [FLINK-18380][examples-table] Add a ChangelogSocketExample
18cd958 is described below

commit 18cd958b67fffeb3acc4ecf4cba712733bdb554a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jun 19 15:03:07 2020 +0200

    [FLINK-18380][examples-table] Add a ChangelogSocketExample
---
 flink-examples/flink-examples-table/pom.xml        |  40 ++++++--
 .../java/{ => basics}/StreamSQLExample.java        |   2 +-
 .../java/{ => basics}/StreamWindowSQLExample.java  |   2 +-
 .../examples/java/{ => basics}/WordCountSQL.java   |   2 +-
 .../examples/java/{ => basics}/WordCountTable.java |   2 +-
 .../java/connectors/ChangelogCsvDeserializer.java  |  98 +++++++++++++++++++
 .../java/connectors/ChangelogCsvFormat.java        |  71 ++++++++++++++
 .../java/connectors/ChangelogCsvFormatFactory.java |  76 +++++++++++++++
 .../java/connectors/ChangelogSocketExample.java    |  96 ++++++++++++++++++
 .../java/connectors/SocketDynamicTableFactory.java | 104 ++++++++++++++++++++
 .../java/connectors/SocketDynamicTableSource.java  |  90 +++++++++++++++++
 .../java/connectors/SocketSourceFunction.java      | 107 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  17 ++++
 13 files changed, 695 insertions(+), 12 deletions(-)

diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index 582900e..cd4ebcf 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -110,12 +110,12 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.table.examples.java.StreamSQLExample</program-class>
+									<program-class>org.apache.flink.table.examples.java.basics.StreamSQLExample</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/table/examples/java/StreamSQLExample*</include>
+								<include>org/apache/flink/table/examples/java/basics/StreamSQLExample*</include>
 							</includes>
 						</configuration>
 					</execution>
@@ -132,12 +132,12 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.table.examples.java.StreamWindowSQLExample</program-class>
+									<program-class>org.apache.flink.table.examples.java.basics.StreamWindowSQLExample</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/table/examples/java/StreamWindowSQLExample*</include>
+								<include>org/apache/flink/table/examples/java/basics/StreamWindowSQLExample*</include>
 							</includes>
 						</configuration>
 					</execution>
@@ -154,12 +154,12 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.table.examples.java.WordCountSQL</program-class>
+									<program-class>org.apache.flink.table.examples.java.basics.WordCountSQL</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/table/examples/java/WordCountSQL*</include>
+								<include>org/apache/flink/table/examples/java/basics/WordCountSQL*</include>
 							</includes>
 						</configuration>
 					</execution>
@@ -176,12 +176,35 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.table.examples.java.WordCountTable</program-class>
+									<program-class>org.apache.flink.table.examples.java.basics.WordCountTable</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/table/examples/java/WordCountTable*</include>
+								<include>org/apache/flink/table/examples/java/basics/WordCountTable*</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<execution>
+						<id>ChangelogSocketExample</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+
+						<configuration>
+							<classifier>ChangelogSocketExample</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.table.examples.java.connectors.ChangelogSocketExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/table/examples/java/connectors/ChangelogSocketExample*</include>
+								<include>**/META-INF/services/*</include>
 							</includes>
 						</configuration>
 					</execution>
@@ -247,6 +270,7 @@ under the License.
 								<copy file="${project.basedir}/target/flink-examples-table_${scala.binary.version}-${project.version}-WordCountTable.jar" tofile="${project.basedir}/target/WordCountTable.jar"/>
 								<copy file="${project.basedir}/target/flink-examples-table_${scala.binary.version}-${project.version}-StreamTableExample.jar" tofile="${project.basedir}/target/StreamTableExample.jar"/>
 								<copy file="${project.basedir}/target/flink-examples-table_${scala.binary.version}-${project.version}-TPCHQuery3Table.jar" tofile="${project.basedir}/target/TPCHQuery3Table.jar"/>
+								<copy file="${project.basedir}/target/flink-examples-table_${scala.binary.version}-${project.version}-ChangelogSocketExample.jar" tofile="${project.basedir}/target/ChangelogSocketExample.jar"/>
 							</target>
 						</configuration>
 					</execution>
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
similarity index 98%
rename from flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java
rename to flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
index 7c71a44..1d40f15 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.examples.java;
+package org.apache.flink.table.examples.java.basics;
 
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamWindowSQLExample.java
similarity index 98%
rename from flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java
rename to flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamWindowSQLExample.java
index 0e8d2db..3daf236 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamWindowSQLExample.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.examples.java;
+package org.apache.flink.table.examples.java.basics;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountSQL.java
similarity index 98%
rename from flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
rename to flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountSQL.java
index 1851fffb..3046ff5 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountSQL.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.examples.java;
+package org.apache.flink.table.examples.java.basics;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountTable.java
similarity index 98%
rename from flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java
rename to flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountTable.java
index 385da2e..8bb5077 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/WordCountTable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.examples.java;
+package org.apache.flink.table.examples.java.basics;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvDeserializer.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvDeserializer.java
new file mode 100644
index 0000000..90a91d5
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvDeserializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.RuntimeConverter.Context;
+import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * The {@link ChangelogCsvDeserializer} contains a simple parsing logic for converting bytes into
+ * {@link Row} of {@link Integer} and {@link String} with a {@link RowKind}.
+ *
+ * <p>The final conversion step converts those into internal data structures.
+ */
+public final class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {
+
+	private final List<LogicalType> parsingTypes;
+	private final DataStructureConverter converter;
+	private final TypeInformation<RowData> producedTypeInfo;
+	private final String columnDelimiter;
+
+	public ChangelogCsvDeserializer(
+			List<LogicalType> parsingTypes,
+			DataStructureConverter converter,
+			TypeInformation<RowData> producedTypeInfo,
+			String columnDelimiter) {
+		this.parsingTypes = parsingTypes;
+		this.converter = converter;
+		this.producedTypeInfo = producedTypeInfo;
+		this.columnDelimiter = columnDelimiter;
+	}
+
+	@Override
+	public TypeInformation<RowData> getProducedType() {
+		// return the type information required by Flink's core interfaces
+		return producedTypeInfo;
+	}
+
+	@Override
+	public void open(InitializationContext context) {
+		// converters must be opened
+		converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
+	}
+
+	@Override
+	public RowData deserialize(byte[] message) {
+		// parse the columns including a changelog flag
+		final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
+		final RowKind kind = RowKind.valueOf(columns[0]);
+		final Row row = new Row(kind, parsingTypes.size());
+		for (int i = 0; i < parsingTypes.size(); i++) {
+			row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
+		}
+		// convert to internal data structure
+		return (RowData) converter.toInternal(row);
+	}
+
+	private static Object parse(LogicalTypeRoot root, String value) {
+		switch (root) {
+			case INTEGER:
+				return Integer.parseInt(value);
+			case VARCHAR:
+				return value;
+			default:
+				throw new IllegalArgumentException();
+		}
+	}
+
+	@Override
+	public boolean isEndOfStream(RowData nextElement) {
+		return false;
+	}
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java
new file mode 100644
index 0000000..c8c8689
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+
+/**
+ * The {@link ChangelogCsvFormat} is a decoding format that uses a {@link DeserializationSchema} during
+ * runtime. It supports emitting {@code INSERT} and {@code DELETE} changes.
+ */
+public final class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+	private final String columnDelimiter;
+
+	public ChangelogCsvFormat(String columnDelimiter) {
+		this.columnDelimiter = columnDelimiter;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context, DataType producedDataType) {
+		// create type information for the DeserializationSchema
+		final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+
+		// most of the code in DeserializationSchema will not work on internal data structures
+		// create a converter for conversion at the end
+		final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
+
+		// use logical types during runtime for parsing
+		final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();
+
+		// create runtime class
+		return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode() {
+		// define that this format can produce INSERT and DELETE rows
+		return ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+	}
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormatFactory.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormatFactory.java
new file mode 100644
index 0000000..a61836a
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormatFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The {@link ChangelogCsvFormatFactory} translates format-specific options to a format.
+ *
+ * <p>The {@link FactoryUtil} in {@link SocketDynamicTableFactory} takes care of adapting the option
+ * keys accordingly and handles the prefixing like {@code changelog-csv.column-delimiter}.
+ *
+ * <p>Because this factory implements {@link DeserializationFormatFactory}, it could also be used for
+ * other connectors that support deserialization formats such as the Kafka connector.
+ */
+public final class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
+
+	// define all options statically
+	public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
+		.stringType()
+		.defaultValue("|");
+
+	@Override
+	public String factoryIdentifier() {
+		return "changelog-csv";
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		return Collections.emptySet();
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(COLUMN_DELIMITER);
+		return options;
+	}
+
+	@Override
+	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+		// get the validated options
+		final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);
+
+		// create and return the format
+		return new ChangelogCsvFormat(columnDelimiter);
+	}
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java
new file mode 100644
index 0000000..dae9197
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.Row;
+
+/**
+ * Example for implementing a custom {@link DynamicTableSource} and a {@link DecodingFormat}.
+ *
+ * <p>The example implements a table source with a decoding format that supports changelog semantics.
+ *
+ * <p>The {@link SocketDynamicTableFactory} illustrates how connector components play together. It can
+ * serve as a reference implementation for implementing own connectors and/or formats.
+ *
+ * <p>The {@link SocketDynamicTableSource} uses a simple single-threaded {@link SourceFunction} to open
+ * a socket that listens for incoming bytes. The raw bytes are decoded into rows by a pluggable format.
+ * The format expects a changelog flag as the first column.
+ *
+ * <p>In particular, the example shows how to
+ * <ul>
+ *     <li>create factories that parse and validate options,
+ *     <li>implement table connectors,
+ *     <li>implement and discover custom formats,
+ *     <li>and use provided utilities such as data structure converters and the {@link FactoryUtil}.
+ * </ul>
+ *
+ * <p>Usage: <code>ChangelogSocketExample --hostname &lt;localhost&gt; --port &lt;9999&gt;</code>
+ *
+ * <p>Use the following command to ingest data in a terminal:
+ * <pre>
+ *     nc -lk 9999
+ *     INSERT|Alice|12
+ *     INSERT|Bob|5
+ *     DELETE|Alice|12
+ *     INSERT|Alice|18
+ * </pre>
+ *
+ * <p>The result is written to stdout.
+ */
+public final class ChangelogSocketExample {
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		final String hostname = params.get("hostname", "localhost");
+		final String port = params.get("port", "9999");
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1); // source only supports parallelism of 1
+
+		final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+		// register a table in the catalog
+		tEnv.executeSql(
+			"CREATE TABLE UserScores (name STRING, score INT)\n" +
+			"WITH (\n" +
+			"  'connector' = 'socket',\n" +
+			"  'hostname' = '" + hostname + "',\n" +
+			"  'port' = '" + port + "',\n" +
+			"  'byte-delimiter' = '10',\n" +
+			"  'format' = 'changelog-csv',\n" +
+			"  'changelog-csv.column-delimiter' = '|'\n" +
+			")");
+
+		// define a dynamic aggregating query
+		final Table result = tEnv.sqlQuery("SELECT name, SUM(score) FROM UserScores GROUP BY name");
+
+		// print the result to the console
+		tEnv.toRetractStream(result, Row.class).print();
+
+		env.execute();
+	}
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
new file mode 100644
index 0000000..5a61d29
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The {@link SocketDynamicTableFactory} translates the catalog table to a table source.
+ *
+ * <p>Because the table source requires a decoding format, we are discovering the format using the
+ * provided {@link FactoryUtil} for convenience.
+ */
+public final class SocketDynamicTableFactory implements DynamicTableSourceFactory {
+
+	// define all options statically
+	public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
+		.stringType()
+		.noDefaultValue();
+
+	public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
+		.intType()
+		.noDefaultValue();
+
+	public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
+		.intType()
+		.defaultValue(10); // corresponds to '\n'
+
+	@Override
+	public String factoryIdentifier() {
+		return "socket"; // used for matching to `connector = '...'`
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(HOSTNAME);
+		options.add(PORT);
+		options.add(FactoryUtil.FORMAT); // use pre-defined option for format
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(BYTE_DELIMITER);
+		return options;
+	}
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		// either implement your custom validation logic here ...
+		// or use the provided helper utility
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		// discover a suitable decoding format
+		final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
+			DeserializationFormatFactory.class,
+			FactoryUtil.FORMAT);
+
+		// validate all options
+		helper.validate();
+
+		// get the validated options
+		final ReadableConfig options = helper.getOptions();
+		final String hostname = options.get(HOSTNAME);
+		final int port = options.get(PORT);
+		final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
+
+		// derive the produced data type (excluding computed columns) from the catalog table
+		final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+		// create and return dynamic table source
+		return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
+	}
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableSource.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableSource.java
new file mode 100644
index 0000000..102e4bd
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketDynamicTableSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * The {@link SocketDynamicTableSource} is used during planning.
+ *
+ * <p>In our example, we don't implement any of the available ability interfaces such as {@link SupportsFilterPushDown}
+ * or {@link SupportsProjectionPushDown}. Therefore, the main logic can be found in {@link #getScanRuntimeProvider(ScanContext)}
+ * where we instantiate the required {@link SourceFunction} and its {@link DeserializationSchema} for
+ * runtime. Both instances are parameterized to return internal data structures (i.e. {@link RowData}).
+ */
+public final class SocketDynamicTableSource implements ScanTableSource {
+
+	private final String hostname;
+	private final int port;
+	private final byte byteDelimiter;
+	private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
+	private final DataType producedDataType;
+
+	public SocketDynamicTableSource(
+			String hostname,
+			int port,
+			byte byteDelimiter,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
+			DataType producedDataType) {
+		this.hostname = hostname;
+		this.port = port;
+		this.byteDelimiter = byteDelimiter;
+		this.decodingFormat = decodingFormat;
+		this.producedDataType = producedDataType;
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode() {
+		// in our example the format decides about the changelog mode
+		// but it could also be the source itself
+		return decodingFormat.getChangelogMode();
+	}
+
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+
+		// create runtime classes that are shipped to the cluster
+
+		final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(runtimeProviderContext, producedDataType);
+
+		final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(hostname, port, byteDelimiter, deserializer);
+
+		return SourceFunctionProvider.of(sourceFunction, false);
+	}
+
+	@Override
+	public DynamicTableSource copy() {
+		return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
+	}
+
+	@Override
+	public String asSummaryString() {
+		return "Socket Table Source";
+	}
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSourceFunction.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSourceFunction.java
new file mode 100644
index 0000000..c21e6f7
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSourceFunction.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+/**
+ * The {@link SocketSourceFunction} opens a socket and consumes bytes.
+ *
+ * <p>It splits records by the given byte delimiter (`\n` by default) and delegates the decoding to a
+ * pluggable {@link DeserializationSchema}.
+ *
+ * <p>Note: This is only an example and should not be used in production. The source function is not
+ * fault-tolerant and can only work with a parallelism of 1.
+ */
+public final class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
+
+	private final String hostname;
+	private final int port;
+	private final byte byteDelimiter;
+	private final DeserializationSchema<RowData> deserializer;
+
+	private volatile boolean isRunning = true;
+	private Socket currentSocket;
+
+	public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
+		this.hostname = hostname;
+		this.port = port;
+		this.byteDelimiter = byteDelimiter;
+		this.deserializer = deserializer;
+	}
+
+	@Override
+	public TypeInformation<RowData> getProducedType() {
+		return deserializer.getProducedType();
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		deserializer.open(() -> getRuntimeContext().getMetricGroup());
+	}
+
+	@Override
+	public void run(SourceContext<RowData> ctx) throws Exception {
+		while (isRunning) {
+			// open and consume from socket
+			try (final Socket socket = new Socket()) {
+				currentSocket = socket;
+				socket.connect(new InetSocketAddress(hostname, port), 0);
+				try (InputStream stream = socket.getInputStream()) {
+					ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+					int b;
+					while ((b = stream.read()) >= 0) {
+						// buffer until delimiter
+						if (b != byteDelimiter) {
+							buffer.write(b);
+						}
+						// decode and emit record
+						else {
+							ctx.collect(deserializer.deserialize(buffer.toByteArray()));
+							buffer.reset();
+						}
+					}
+				}
+			} catch (Throwable t) {
+				t.printStackTrace(); // print and continue
+			}
+			Thread.sleep(1000);
+		}
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+		try {
+			currentSocket.close();
+		} catch (Throwable t) {
+			// ignore
+		}
+	}
+}
diff --git a/flink-examples/flink-examples-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-examples/flink-examples-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..d101135
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.examples.java.connectors.SocketDynamicTableFactory
+org.apache.flink.table.examples.java.connectors.ChangelogCsvFormatFactory