You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/28 11:49:40 UTC

[flink] branch master updated (d5e472a -> 8f51746)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d5e472a  [FLINK-26799][state/changelog] fix seek condition in StateChangeFormat#read
     new 45ba303  [FLINK-26842][python][tests] Port Scala code to Java
     new 8f51746  [FLINK-26842][python] Remove scala-bridge dependency

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-dist/pom.xml                                 |   2 +-
 flink-dist/src/main/assemblies/opt.xml             |   4 +-
 flink-docs/pom.xml                                 |   2 +-
 flink-end-to-end-tests/flink-python-test/pom.xml   |   2 +-
 flink-python/apache-flink-libraries/setup.py       |   2 +-
 flink-python/pom.xml                               |  26 +-
 .../pyflink/table/tests/test_descriptor.py         |   4 +-
 flink-python/pyflink/testing/test_case_utils.py    |   4 +-
 .../table/legacyutils/ByteMaxAggFunction.java      |  76 +++++
 .../flink/table/legacyutils/CustomAssigner.java    |  32 +++
 .../flink/table/legacyutils/CustomExtractor.java   |  69 +++++
 .../flink/table/legacyutils/MaxAccumulator.java    |  25 ++
 .../apache/flink/table/legacyutils/RichFunc0.java  |  69 +++++
 .../flink/table/legacyutils/RowCollector.java      |  79 ++++++
 .../apache/flink/table/legacyutils/RowSink.java    |  32 +++
 .../apache/flink/table/legacyutils/TableFunc1.java |  42 +++
 .../flink/table/legacyutils/TestAppendSink.java    |  73 +++++
 .../legacyutils/TestCollectionTableFactory.java    | 307 +++++++++++++++++++++
 .../flink/table/legacyutils/TestRetractSink.java   |  63 +++++
 .../flink/table/legacyutils/TestUpsertSink.java    |  87 ++++++
 .../legacyutils/TestCollectionTableFactory.scala   | 253 -----------------
 .../legacyutils/legacyTestingDescriptors.scala     |  80 ------
 .../table/legacyutils/legacyTestingFunctions.scala | 152 ----------
 .../table/legacyutils/legacyTestingSinks.scala     | 200 --------------
 flink-table/flink-sql-client/pom.xml               |   2 +-
 25 files changed, 966 insertions(+), 721 deletions(-)
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
 create mode 100644 flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
 delete mode 100644 flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
 delete mode 100644 flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala
 delete mode 100644 flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala
 delete mode 100644 flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala

[flink] 02/02: [FLINK-26842][python] Remove scala-bridge dependency

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f51746a8c768ba61660e4693030349afab825a5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Mar 22 11:14:02 2022 +0100

    [FLINK-26842][python] Remove scala-bridge dependency
---
 flink-dist/pom.xml                               | 2 +-
 flink-dist/src/main/assemblies/opt.xml           | 4 ++--
 flink-docs/pom.xml                               | 2 +-
 flink-end-to-end-tests/flink-python-test/pom.xml | 2 +-
 flink-python/apache-flink-libraries/setup.py     | 2 +-
 flink-python/pom.xml                             | 8 +-------
 flink-table/flink-sql-client/pom.xml             | 2 +-
 7 files changed, 8 insertions(+), 14 deletions(-)

diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 70eda2b..61f2a51 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -446,7 +446,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-python_${scala.binary.version}</artifactId>
+			<artifactId>flink-python</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 364151b..c56ac25 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -119,9 +119,9 @@
 
 		<!-- Python -->
 		<file>
-			<source>../flink-python/target/flink-python_${scala.binary.version}-${project.version}.jar</source>
+			<source>../flink-python/target/flink-python-${project.version}.jar</source>
 			<outputDirectory>opt</outputDirectory>
-			<destName>flink-python_${scala.binary.version}-${project.version}.jar</destName>
+			<destName>flink-python-${project.version}.jar</destName>
 			<fileMode>0644</fileMode>
 		</file>
 
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index fb3f9fa..4cb4e29 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -127,7 +127,7 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-python_${scala.binary.version}</artifactId>
+			<artifactId>flink-python</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
diff --git a/flink-end-to-end-tests/flink-python-test/pom.xml b/flink-end-to-end-tests/flink-python-test/pom.xml
index 7ac723c..c5d5cec 100644
--- a/flink-end-to-end-tests/flink-python-test/pom.xml
+++ b/flink-end-to-end-tests/flink-python-test/pom.xml
@@ -57,7 +57,7 @@
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-python_${scala.binary.version}</artifactId>
+			<artifactId>flink-python</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 	</dependencies>
diff --git a/flink-python/apache-flink-libraries/setup.py b/flink-python/apache-flink-libraries/setup.py
index d269626..0d83f73 100644
--- a/flink-python/apache-flink-libraries/setup.py
+++ b/flink-python/apache-flink-libraries/setup.py
@@ -116,7 +116,7 @@ run sdist.
         LIB_PATH = os.path.join(FLINK_HOME, "lib")
         OPT_PATH = os.path.join(FLINK_HOME, "opt")
         OPT_PYTHON_JAR_NAME = os.path.basename(
-            find_file_path(os.path.join(OPT_PATH, "flink-python_*.jar")))
+            find_file_path(os.path.join(OPT_PATH, "flink-python*.jar")))
         OPT_SQL_CLIENT_JAR_NAME = os.path.basename(
             find_file_path(os.path.join(OPT_PATH, "flink-sql-client*.jar")))
         LICENSES_PATH = os.path.join(FLINK_HOME, "licenses")
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 46371a4..5257786 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-python_${scala.binary.version}</artifactId>
+	<artifactId>flink-python</artifactId>
 	<name>Flink : Python</name>
 
 	<packaging>jar</packaging>
@@ -74,12 +74,6 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml
index 1906bde..bf93bc8 100644
--- a/flink-table/flink-sql-client/pom.xml
+++ b/flink-table/flink-sql-client/pom.xml
@@ -126,7 +126,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-python_${scala.binary.version}</artifactId>
+			<artifactId>flink-python</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

[flink] 01/02: [FLINK-26842][python][tests] Port Scala code to Java

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45ba3035d00f890540967693d9fd9e43b8b5e0e4
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Mar 21 19:37:37 2022 +0100

    [FLINK-26842][python][tests] Port Scala code to Java
---
 flink-python/pom.xml                               |  18 --
 .../pyflink/table/tests/test_descriptor.py         |   4 +-
 flink-python/pyflink/testing/test_case_utils.py    |   4 +-
 .../table/legacyutils/ByteMaxAggFunction.java      |  76 +++++
 .../flink/table/legacyutils/CustomAssigner.java    |  32 +++
 .../flink/table/legacyutils/CustomExtractor.java   |  69 +++++
 .../flink/table/legacyutils/MaxAccumulator.java    |  25 ++
 .../apache/flink/table/legacyutils/RichFunc0.java  |  69 +++++
 .../flink/table/legacyutils/RowCollector.java      |  79 ++++++
 .../apache/flink/table/legacyutils/RowSink.java    |  32 +++
 .../apache/flink/table/legacyutils/TableFunc1.java |  42 +++
 .../flink/table/legacyutils/TestAppendSink.java    |  73 +++++
 .../legacyutils/TestCollectionTableFactory.java    | 307 +++++++++++++++++++++
 .../flink/table/legacyutils/TestRetractSink.java   |  63 +++++
 .../flink/table/legacyutils/TestUpsertSink.java    |  87 ++++++
 .../legacyutils/TestCollectionTableFactory.scala   | 253 -----------------
 .../legacyutils/legacyTestingDescriptors.scala     |  80 ------
 .../table/legacyutils/legacyTestingFunctions.scala | 152 ----------
 .../table/legacyutils/legacyTestingSinks.scala     | 200 --------------
 19 files changed, 958 insertions(+), 707 deletions(-)

diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 4204436..46371a4 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -470,24 +470,6 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
-			<!-- This is only a temporary solution until FLINK-22872 is fixed. -->
-			<!-- It compiles `org.apache.flink.table.legacyutils` containing code from the old planner. -->
-			<!-- We should not start adding more Scala code. Please remove this as soon as possible. -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<executions>
-					<!-- Run Scala compiler in the process-test-resources phase, so that dependencies on
-						 Scala classes can be resolved later in the (Java) test-compile phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
 		</plugins>
 	</build>
 </project>
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index c973d9c..e63429b 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -50,8 +50,8 @@ class RowTimeDescriptorTests(PyFlinkTestCase):
             'rowtime.timestamps.class':
                 'org.apache.flink.table.legacyutils.CustomExtractor',
             'rowtime.timestamps.serialized':
-                'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvctj'
-                'ZLTGK9XvxAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
+                'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvcl4'
+                'ozwVLIwG6AgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
                 '50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1Y6piFNsGAIAAHhwd'
                 'AACdHM'}
         self.assertEqual(expected, properties)
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index c8749648..8abe31f 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -107,8 +107,8 @@ class PyFlinkTestCase(unittest.TestCase):
     @classmethod
     def to_py_list(cls, actual):
         py_list = []
-        for i in range(0, actual.length()):
-            py_list.append(actual.apply(i))
+        for i in range(0, actual.size()):
+            py_list.append(actual.get(i))
         return py_list
 
     @classmethod
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
new file mode 100644
index 0000000..ad9ebb4
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.functions.AggregateFunction;
+
+/** {@link AggregateFunction} for {@link Byte}. */
+public class ByteMaxAggFunction extends AggregateFunction<Byte, MaxAccumulator<Byte>> {
+
+    private static final long serialVersionUID = 1233840393767061909L;
+
+    @Override
+    public MaxAccumulator<Byte> createAccumulator() {
+        final MaxAccumulator<Byte> acc = new MaxAccumulator<>();
+        resetAccumulator(acc);
+        return acc;
+    }
+
+    public void accumulate(MaxAccumulator<Byte> acc, Byte value) {
+        if (value != null) {
+            if (!acc.f1 || Byte.compare(acc.f0, value) < 0) {
+                acc.f0 = value;
+                acc.f1 = true;
+            }
+        }
+    }
+
+    @Override
+    public Byte getValue(MaxAccumulator<Byte> acc) {
+        if (acc.f1) {
+            return acc.f0;
+        } else {
+            return null;
+        }
+    }
+
+    public void merge(MaxAccumulator<Byte> acc, Iterable<MaxAccumulator<Byte>> its) {
+        its.forEach(
+                a -> {
+                    if (a.f1) {
+                        accumulate(acc, a.f0);
+                    }
+                });
+    }
+
+    public void resetAccumulator(MaxAccumulator<Byte> acc) {
+        acc.f0 = 0;
+        acc.f1 = false;
+    }
+
+    @Override
+    public TypeInformation<MaxAccumulator<Byte>> getAccumulatorType() {
+        return new TupleTypeInfo(
+                MaxAccumulator.class,
+                BasicTypeInfo.BYTE_TYPE_INFO,
+                BasicTypeInfo.BOOLEAN_TYPE_INFO);
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
new file mode 100644
index 0000000..d69cce5
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
+import org.apache.flink.types.Row;
+
+/** A watermark assigner that throws an exception if a watermark is requested. */
+public class CustomAssigner extends PunctuatedWatermarkAssigner {
+    private static final long serialVersionUID = -4900176786361416000L;
+
+    @Override
+    public Watermark getWatermark(Row row, long timestamp) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
new file mode 100644
index 0000000..cf66733
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.util.Preconditions;
+
+/** A timestamp extractor that looks for the SQL_TIMESTAMP "ts" field. */
+public class CustomExtractor extends TimestampExtractor {
+    private static final long serialVersionUID = 6784900460276023738L;
+
+    private final String field = "ts";
+
+    @Override
+    public String[] getArgumentFields() {
+        return new String[] {field};
+    }
+
+    @Override
+    public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) {
+        if (argumentFieldTypes[0] != Types.SQL_TIMESTAMP) {
+            throw new ValidationException(
+                    String.format(
+                            "Field 'ts' must be of type Timestamp but is of type %s.",
+                            argumentFieldTypes[0]));
+        }
+    }
+
+    @Override
+    public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
+        ResolvedFieldReference fieldAccess = fieldAccesses[0];
+        Preconditions.checkArgument(fieldAccess.resultType() == Types.SQL_TIMESTAMP);
+        FieldReferenceExpression fieldReferenceExpr =
+                new FieldReferenceExpression(
+                        fieldAccess.name(),
+                        TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType()),
+                        0,
+                        fieldAccess.fieldIndex());
+        return ApiExpressionUtils.unresolvedCall(
+                BuiltInFunctionDefinitions.CAST,
+                fieldReferenceExpr,
+                ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()));
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
new file mode 100644
index 0000000..191aa87
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** Utility class to make working with tuples more readable. */
+public class MaxAccumulator<T> extends Tuple2<T, Boolean> {
+    private static final long serialVersionUID = 6089142148200600733L;
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
new file mode 100644
index 0000000..a757fe9
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import org.junit.Assert;
+
+/**
+ * Testing scalar function to verify that lifecycle methods are called in the expected order and
+ * only once.
+ */
+public class RichFunc0 extends ScalarFunction {
+    private static final long serialVersionUID = 931156471687322386L;
+
+    private boolean openCalled = false;
+    private boolean closeCalled = false;
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        if (openCalled) {
+            Assert.fail("Open called more than once.");
+        } else {
+            openCalled = true;
+        }
+        if (closeCalled) {
+            Assert.fail("Close called before open.");
+        }
+    }
+
+    public void eval(int index) {
+        if (!openCalled) {
+            Assert.fail("Open was not called before eval.");
+        }
+        if (closeCalled) {
+            Assert.fail("Close called before eval.");
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (closeCalled) {
+            Assert.fail("Close called more than once.");
+        } else {
+            closeCalled = true;
+        }
+        if (!openCalled) {
+            Assert.fail("Open was not called before close.");
+        }
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
new file mode 100644
index 0000000..9fed516
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** A collector for storing results in memory. */
+class RowCollector {
+    private static final ArrayDeque<Tuple2<Boolean, Row>> sink = new ArrayDeque<>();
+
+    public static void addValue(Tuple2<Boolean, Row> value) {
+        synchronized (sink) {
+            sink.add(value.copy());
+        }
+    }
+
+    public static List<Tuple2<Boolean, Row>> getAndClearValues() {
+        final ArrayList<Tuple2<Boolean, Row>> out = new ArrayList<>(sink);
+        sink.clear();
+        return out;
+    }
+
+    public static List<String> retractResults(List<Tuple2<Boolean, Row>> results) {
+        final Map<String, Integer> retracted =
+                results.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        r -> r.f1.toString(),
+                                        Collectors.mapping(
+                                                r -> r.f0 ? 1 : -1,
+                                                Collectors.reducing(
+                                                        0, (left, right) -> left + right))));
+
+        if (retracted.values().stream().anyMatch(c -> c < 0)) {
+            throw new AssertionError("Received retracted rows which have not been accumulated.");
+        }
+
+        return retracted.entrySet().stream()
+                .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> e.getKey()))
+                .collect(Collectors.toList());
+    }
+
+    public static List<String> upsertResults(List<Tuple2<Boolean, Row>> results, int[] keys) {
+        final HashMap<Row, String> upserted = new HashMap<>();
+        for (Tuple2<Boolean, Row> r : results) {
+            final Row key = Row.project(r.f1, keys);
+            if (r.f0) {
+                upserted.put(key, r.f1.toString());
+            } else {
+                upserted.remove(key);
+            }
+        }
+        return new ArrayList<>(upserted.values());
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
new file mode 100644
index 0000000..0051d9c
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.types.Row;
+
+/** A sink that stores data in the {@link RowCollector}. */
+public class RowSink implements SinkFunction<Tuple2<Boolean, Row>> {
+    private static final long serialVersionUID = -7264802354440479084L;
+
+    @Override
+    public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
+        RowCollector.addValue(value);
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
new file mode 100644
index 0000000..4e34e65
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.table.functions.TableFunction;
+
+/** A function that splits strings, optionally adding a prefix. */
+public class TableFunc1 extends TableFunction<String> {
+
+    private static final long serialVersionUID = -5471603822898040617L;
+
+    public void eval(String str) {
+        if (str.contains("#")) {
+            for (String s : str.split("#")) {
+                collect(s);
+            }
+        }
+    }
+
+    public void eval(String str, String prefix) {
+        if (str.contains("#")) {
+            for (String s : str.split("#")) {
+                collect(prefix + s);
+            }
+        }
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
new file mode 100644
index 0000000..cde5467
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/** Testing append sink. */
+public class TestAppendSink implements AppendStreamTableSink<Row> {
+
+    private String[] fNames = null;
+    private TypeInformation<?>[] fTypes = null;
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+        return dataStream
+                .map(
+                        new MapFunction<Row, Tuple2<Boolean, Row>>() {
+                            private static final long serialVersionUID = 4671583708680989488L;
+
+                            @Override
+                            public Tuple2<Boolean, Row> map(Row value) throws Exception {
+                                return Tuple2.of(true, value);
+                            }
+                        })
+                .addSink(new RowSink());
+    }
+
+    @Override
+    public TypeInformation<Row> getOutputType() {
+        return new RowTypeInfo(fTypes, fNames);
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fTypes;
+    }
+
+    @Override
+    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+        final TestAppendSink copy = new TestAppendSink();
+        copy.fNames = fieldNames;
+        copy.fTypes = fieldTypes;
+        return copy;
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
new file mode 100644
index 0000000..dfa2db4
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory for the testing sinks. */
+public class TestCollectionTableFactory
+        implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row> {
+
+    private static boolean isStreaming = true;
+
+    private static final List<Row> SOURCE_DATA = new LinkedList<>();
+    private static final List<Row> DIM_DATA = new LinkedList<>();
+    private static final List<Row> RESULT = new LinkedList<>();
+
+    private long emitIntervalMS = -1L;
+
+    @Override
+    public TableSource<Row> createTableSource(Map<String, String> properties) {
+        return getCollectionSource(properties, isStreaming);
+    }
+
+    @Override
+    public TableSink<Row> createTableSink(Map<String, String> properties) {
+        return getCollectionSink(properties);
+    }
+
+    @Override
+    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+        return getCollectionSource(properties, true);
+    }
+
+    @Override
+    public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
+        return getCollectionSink(properties);
+    }
+
+    private CollectionTableSource getCollectionSource(
+            Map<String, String> props, boolean isStreaming) {
+        final DescriptorProperties properties = new DescriptorProperties();
+        properties.putProperties(props);
+        final TableSchema schema = properties.getTableSchema(Schema.SCHEMA);
+        final Optional<Integer> parallelism = properties.getOptionalInt("parallelism");
+        return new CollectionTableSource(emitIntervalMS, schema, isStreaming, parallelism);
+    }
+
+    private CollectionTableSink getCollectionSink(Map<String, String> props) {
+        final DescriptorProperties properties = new DescriptorProperties();
+        properties.putProperties(props);
+        final TableSchema schema = properties.getTableSchema(Schema.SCHEMA);
+        return new CollectionTableSink((RowTypeInfo) schema.toRowType());
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        final HashMap<String, String> context = new HashMap<>();
+        context.put(ConnectorDescriptorValidator.CONNECTOR, "COLLECTION");
+        return context;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        return Arrays.asList("*");
+    }
+
+    private static class CollectionTableSource
+            implements StreamTableSource<Row>, LookupableTableSource<Row> {
+        private final long emitIntervalMs;
+        private final TableSchema schema;
+        private final boolean isStreaming;
+        private final Optional<Integer> parallelism;
+        private final TypeInformation<Row> rowType;
+
+        private CollectionTableSource(
+                long emitIntervalMs,
+                TableSchema schema,
+                boolean isStreaming,
+                Optional<Integer> parallelism) {
+            this.emitIntervalMs = emitIntervalMs;
+            this.schema = schema;
+            this.isStreaming = isStreaming;
+            this.parallelism = parallelism;
+            this.rowType = schema.toRowType();
+        }
+
+        @Override
+        public boolean isBounded() {
+            return !isStreaming;
+        }
+
+        @Override
+        public DataStream<Row> getDataStream(StreamExecutionEnvironment streamEnv) {
+            final DataStreamSource<Row> dataStream =
+                    streamEnv.createInput(
+                            new TestCollectionInputFormat<>(
+                                    emitIntervalMs,
+                                    SOURCE_DATA,
+                                    rowType.createSerializer(new ExecutionConfig())),
+                            rowType);
+            if (parallelism.isPresent()) {
+                dataStream.setParallelism(parallelism.get());
+            }
+            return dataStream;
+        }
+
+        @Override
+        public TypeInformation<Row> getReturnType() {
+            return rowType;
+        }
+
+        @Override
+        public TableSchema getTableSchema() {
+            return schema;
+        }
+
+        @Override
+        public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
+            final String[] schemaFieldNames = schema.getFieldNames();
+            final int[] keys =
+                    Arrays.stream(lookupKeys)
+                            .map(
+                                    k -> {
+                                        for (int x = 0; x < schemaFieldNames.length; x++) {
+                                            if (k.equals(schemaFieldNames[x])) {
+                                                return x;
+                                            }
+                                        }
+                                        throw new IllegalStateException();
+                                    })
+                            .mapToInt(i -> i)
+                            .toArray();
+
+            return new TemporalTableFetcher(DIM_DATA, keys);
+        }
+
+        @Override
+        public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
+            return null;
+        }
+
+        @Override
+        public boolean isAsyncEnabled() {
+            return false;
+        }
+    }
+
+    private static class CollectionTableSink implements AppendStreamTableSink<Row> {
+        private final RowTypeInfo outputType;
+
+        private CollectionTableSink(RowTypeInfo outputType) {
+            this.outputType = outputType;
+        }
+
+        @Override
+        public RowTypeInfo getOutputType() {
+            return outputType;
+        }
+
+        @Override
+        public String[] getFieldNames() {
+            return outputType.getFieldNames();
+        }
+
+        @Override
+        public TypeInformation<?>[] getFieldTypes() {
+            return outputType.getFieldTypes();
+        }
+
+        @Override
+        public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+            return dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1);
+        }
+
+        @Override
+        public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+            return this;
+        }
+    }
+
+    private static class UnsafeMemorySinkFunction extends RichSinkFunction<Row> {
+        private static final long serialVersionUID = -7880686562734099699L;
+
+        private final TypeInformation<Row> outputType;
+        private TypeSerializer<Row> serializer = null;
+
+        private UnsafeMemorySinkFunction(TypeInformation<Row> outputType) {
+            this.outputType = outputType;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            serializer = outputType.createSerializer(new ExecutionConfig());
+        }
+
+        @Override
+        public void invoke(Row row, Context context) throws Exception {
+            RESULT.add(serializer.copy(row));
+        }
+    }
+
+    private static class TestCollectionInputFormat<T> extends CollectionInputFormat<T> {
+
+        private static final long serialVersionUID = -3222731547793350189L;
+
+        private final long emitIntervalMs;
+
+        public TestCollectionInputFormat(
+                long emitIntervalMs, Collection<T> dataSet, TypeSerializer<T> serializer) {
+            super(dataSet, serializer);
+            this.emitIntervalMs = emitIntervalMs;
+        }
+
+        @Override
+        public boolean reachedEnd() throws IOException {
+            if (emitIntervalMs > 0) {
+                try {
+                    Thread.sleep(emitIntervalMs);
+                } catch (InterruptedException e) {
+                }
+            }
+            return super.reachedEnd();
+        }
+    }
+
+    private static class TemporalTableFetcher extends TableFunction<Row> {
+        private static final long serialVersionUID = 6248306950388784015L;
+
+        private final List<Row> dimData;
+        private final int[] keys;
+
+        private TemporalTableFetcher(List<Row> dimData, int[] keys) {
+            this.dimData = dimData;
+            this.keys = keys;
+        }
+
+        public void eval(Row values) {
+            for (Row data : dimData) {
+                boolean matched = true;
+                int idx = 0;
+                while (matched && idx < keys.length) {
+                    final Object dimField = data.getField(keys[idx]);
+                    final Object inputField = values.getField(idx);
+                    matched = dimField.equals(inputField);
+                    idx += 1;
+                }
+                if (matched) {
+                    // copy the row data
+                    final Row ret = new Row(data.getArity());
+                    for (int x = 0; x < data.getArity(); x++) {
+                        ret.setField(x, data.getField(x));
+                    }
+                    collect(ret);
+                }
+            }
+        }
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
new file mode 100644
index 0000000..449aea7
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/** Testing retract sink. */
+public class TestRetractSink implements RetractStreamTableSink<Row> {
+
+    private String[] fNames = null;
+    private TypeInformation<?>[] fTypes = null;
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+        return dataStream.addSink(new RowSink());
+    }
+
+    @Override
+    public TypeInformation<Row> getRecordType() {
+        return new RowTypeInfo(fTypes, fNames);
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fTypes;
+    }
+
+    @Override
+    public TableSink<Tuple2<Boolean, Row>> configure(
+            String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+        final TestRetractSink copy = new TestRetractSink();
+        copy.fNames = fieldNames;
+        copy.fTypes = fieldTypes;
+        return copy;
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
new file mode 100644
index 0000000..1740d53
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.types.Row;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** testing upsert sink. */
+public class TestUpsertSink implements UpsertStreamTableSink<Row> {
+
+    private String[] fNames = null;
+    private TypeInformation<?>[] fTypes = null;
+
+    private final String[] expectedKeys;
+    private final boolean expectedIsAppendOnly;
+
+    public TestUpsertSink(String[] expectedKeys, boolean expectedIsAppendOnly) {
+        this.expectedKeys = expectedKeys;
+        this.expectedIsAppendOnly = expectedIsAppendOnly;
+    }
+
+    @Override
+    public void setKeyFields(String[] keys) {
+        assertThat(keys)
+                .as("Provided key fields do not match expected keys")
+                .containsExactlyInAnyOrder(expectedKeys);
+    }
+
+    @Override
+    public void setIsAppendOnly(Boolean isAppendOnly) {
+        assertThat(isAppendOnly)
+                .as("Provided isAppendOnly does not match expected isAppendOnly")
+                .isEqualTo(expectedIsAppendOnly);
+    }
+
+    @Override
+    public TypeInformation<Row> getRecordType() {
+        return new RowTypeInfo(fTypes, fNames);
+    }
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> s) {
+        return s.addSink(new RowSink());
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fTypes;
+    }
+
+    @Override
+    public TableSink<Tuple2<Boolean, Row>> configure(
+            String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+        final TestUpsertSink copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly);
+        copy.fNames = fieldNames;
+        copy.fTypes = fieldTypes;
+        return copy;
+    }
+}
diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
deleted file mode 100644
index 0d8e03a..0000000
--- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.io.CollectionInputFormat
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
-import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory}
-import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
-import org.apache.flink.table.legacyutils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
-import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, TableSink}
-import org.apache.flink.table.sources.{LookupableTableSource, StreamTableSource, TableSource}
-import org.apache.flink.types.Row
-
-import java.io.IOException
-import java.util
-import java.util.{Optional, ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
-
-import scala.collection.JavaConversions._
-
-/**
- * Testing utils adopted from legacy planner until the Python code is updated.
- */
-@deprecated
-class TestCollectionTableFactory
-  extends StreamTableSourceFactory[Row]
-  with StreamTableSinkFactory[Row]
-{
-
-  override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
-    getCollectionSource(properties, TestCollectionTableFactory.isStreaming)
-  }
-
-  override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
-    getCollectionSource(properties, isStreaming = true)
-  }
-
-  override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def requiredContext(): JMap[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR, "COLLECTION")
-    context
-  }
-
-  override def supportedProperties(): JList[String] = {
-    val supported = new JArrayList[String]()
-    supported.add("*")
-    supported
-  }
-}
-
-@deprecated
-object TestCollectionTableFactory {
-  var isStreaming: Boolean = true
-
-  val SOURCE_DATA = new JLinkedList[Row]()
-  val DIM_DATA = new JLinkedList[Row]()
-  val RESULT = new JLinkedList[Row]()
-  private var emitIntervalMS = -1L
-
-  def initData(sourceData: JList[Row],
-      dimData: JList[Row] = List(),
-      emitInterval: Long = -1L): Unit ={
-    SOURCE_DATA.addAll(sourceData)
-    DIM_DATA.addAll(dimData)
-    emitIntervalMS = emitInterval
-  }
-
-  def reset(): Unit ={
-    RESULT.clear()
-    SOURCE_DATA.clear()
-    DIM_DATA.clear()
-    emitIntervalMS = -1L
-  }
-
-  def getCollectionSource(props: JMap[String, String],
-      isStreaming: Boolean): CollectionTableSource = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
-    val parallelism = properties.getOptionalInt("parallelism")
-    new CollectionTableSource(emitIntervalMS, schema, isStreaming, parallelism)
-  }
-
-  def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
-    new CollectionTableSink(schema.toRowType.asInstanceOf[RowTypeInfo])
-  }
-
-  /**
-    * Table source of collection.
-    */
-  class CollectionTableSource(
-      val emitIntervalMs: Long,
-      val schema: TableSchema,
-      val isStreaming: Boolean,
-      val parallelism: Optional[Integer])
-    extends StreamTableSource[Row]
-    with LookupableTableSource[Row] {
-
-    private val rowType: TypeInformation[Row] = schema.toRowType
-
-    override def isBounded: Boolean = !isStreaming
-
-    override def getDataStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[Row] = {
-      val dataStream = streamEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
-        SOURCE_DATA,
-        rowType.createSerializer(new ExecutionConfig)),
-        rowType)
-      if (parallelism.isPresent) {
-        dataStream.setParallelism(parallelism.get())
-      }
-      dataStream
-    }
-
-    override def getReturnType: TypeInformation[Row] = rowType
-
-    override def getTableSchema: TableSchema = {
-      schema
-    }
-
-    override def getLookupFunction(lookupKeys: Array[String]): TemporalTableFetcher = {
-      new TemporalTableFetcher(DIM_DATA, lookupKeys.map(schema.getFieldNames.indexOf(_)))
-    }
-
-    override def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[Row] = null
-
-    override def isAsyncEnabled: Boolean = false
-  }
-
-  /**
-    * Table sink of collection.
-    */
-  class CollectionTableSink(val outputType: RowTypeInfo)
-      extends AppendStreamTableSink[Row] {
-
-    override def getOutputType: RowTypeInfo = outputType
-
-    override def getFieldNames: Array[String] = outputType.getFieldNames
-
-    override def getFieldTypes: Array[TypeInformation[_]] = {
-      outputType.getFieldTypes
-    }
-
-    override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
-      dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1)
-    }
-
-    override def configure(fieldNames: Array[String],
-        fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
-  }
-
-  /**
-    * Sink function of unsafe memory.
-    */
-  class UnsafeMemorySinkFunction(outputType: TypeInformation[Row]) extends RichSinkFunction[Row] {
-    private var serializer: TypeSerializer[Row] = _
-
-    override def open(param: Configuration): Unit = {
-      serializer = outputType.createSerializer(new ExecutionConfig)
-    }
-
-    @throws[Exception]
-    override def invoke(row: Row): Unit = {
-      RESULT.add(serializer.copy(row))
-    }
-  }
-
-  /**
-    * Collection inputFormat for testing.
-    */
-  class TestCollectionInputFormat[T](
-      val emitIntervalMs: Long,
-      val dataSet: java.util.Collection[T],
-      val serializer: TypeSerializer[T])
-    extends CollectionInputFormat[T](dataSet, serializer) {
-    @throws[IOException]
-    override def reachedEnd: Boolean = {
-      if (emitIntervalMs > 0) {
-        try
-          Thread.sleep(emitIntervalMs)
-        catch {
-          case _: InterruptedException =>
-        }
-      }
-      super.reachedEnd
-    }
-  }
-
-  /**
-    * Dimension table source fetcher.
-    */
-  class TemporalTableFetcher(
-      val dimData: JLinkedList[Row],
-      val keys: Array[Int]) extends TableFunction[Row] {
-
-    @throws[Exception]
-    def eval(values: Any*): Unit = {
-      for (data <- dimData) {
-        var matched = true
-        var idx = 0
-        while (matched && idx < keys.length) {
-          val dimField = data.getField(keys(idx))
-          val inputField = values(idx)
-          matched = dimField.equals(inputField)
-          idx += 1
-        }
-        if (matched) {
-          // copy the row data
-          val ret = new Row(data.getArity)
-          0 until data.getArity foreach { idx =>
-            ret.setField(idx, data.getField(idx))
-          }
-          collect(ret)
-        }
-      }
-    }
-  }
-}
diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala
deleted file mode 100644
index f08d214..0000000
--- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils
-
-import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{DataTypes, ValidationException}
-import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression, FieldReferenceExpression, ResolvedFieldReference}
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions
-import org.apache.flink.table.sources.tsextractors.TimestampExtractor
-import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
-import org.apache.flink.table.types.utils.TypeConversions
-import org.apache.flink.types.Row
-
-/*
- * Testing utils adopted from legacy planner until the Python code is updated.
- */
-
-@deprecated
-class CustomAssigner extends PunctuatedWatermarkAssigner() {
-  override def getWatermark(row: Row, timestamp: Long): Watermark =
-    throw new UnsupportedOperationException()
-}
-
-@deprecated
-class CustomExtractor(val field: String) extends TimestampExtractor {
-  def this() = {
-    this("ts")
-  }
-
-  override def getArgumentFields: Array[String] = Array(field)
-
-  override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
-    argumentFieldTypes(0) match {
-      case Types.SQL_TIMESTAMP =>
-      case _ =>
-        throw new ValidationException(
-          s"Field 'ts' must be of type Timestamp but is of type ${argumentFieldTypes(0)}.")
-    }
-  }
-
-  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
-    val fieldAccess = fieldAccesses(0)
-    require(fieldAccess.resultType == Types.SQL_TIMESTAMP)
-    val fieldReferenceExpr = new FieldReferenceExpression(
-      fieldAccess.name,
-      TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType),
-      0,
-      fieldAccess.fieldIndex)
-    ApiExpressionUtils.unresolvedCall(
-      BuiltInFunctionDefinitions.CAST,
-      fieldReferenceExpr,
-      ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()))
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: CustomExtractor => field == that.field
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    field.hashCode
-  }
-}
diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala
deleted file mode 100644
index 9c032ca..0000000
--- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.TupleTypeInfo
-import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction, TableFunction}
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-
-import java.lang.{Iterable => JIterable}
-import org.junit.Assert
-
-/*
- * Testing utils adopted from legacy planner until the Python code is updated.
- */
-
-@deprecated
-class RichFunc0 extends ScalarFunction {
-  var openCalled = false
-  var closeCalled = false
-
-  override def open(context: FunctionContext): Unit = {
-    super.open(context)
-    if (openCalled) {
-      Assert.fail("Open called more than once.")
-    } else {
-      openCalled = true
-    }
-    if (closeCalled) {
-      Assert.fail("Close called before open.")
-    }
-  }
-
-  def eval(index: Int): Int = {
-    if (!openCalled) {
-      Assert.fail("Open was not called before eval.")
-    }
-    if (closeCalled) {
-      Assert.fail("Close called before eval.")
-    }
-
-    index + 1
-  }
-
-  override def close(): Unit = {
-    super.close()
-    if (closeCalled) {
-      Assert.fail("Close called more than once.")
-    } else {
-      closeCalled = true
-    }
-    if (!openCalled) {
-      Assert.fail("Open was not called before close.")
-    }
-  }
-}
-
-@deprecated
-class MaxAccumulator[T] extends JTuple2[T, Boolean]
-
-@deprecated
-abstract class MaxAggFunction[T](implicit ord: Ordering[T])
-  extends AggregateFunction[T, MaxAccumulator[T]] {
-
-  override def createAccumulator(): MaxAccumulator[T] = {
-    val acc = new MaxAccumulator[T]
-    acc.f0 = getInitValue
-    acc.f1 = false
-    acc
-  }
-
-  def accumulate(acc: MaxAccumulator[T], value: Any): Unit = {
-    if (value != null) {
-      val v = value.asInstanceOf[T]
-      if (!acc.f1 || ord.compare(acc.f0, v) < 0) {
-        acc.f0 = v
-        acc.f1 = true
-      }
-    }
-  }
-
-  override def getValue(acc: MaxAccumulator[T]): T = {
-    if (acc.f1) {
-      acc.f0
-    } else {
-      null.asInstanceOf[T]
-    }
-  }
-
-  def merge(acc: MaxAccumulator[T], its: JIterable[MaxAccumulator[T]]): Unit = {
-    val iter = its.iterator()
-    while (iter.hasNext) {
-      val a = iter.next()
-      if (a.f1) {
-        accumulate(acc, a.f0)
-      }
-    }
-  }
-
-  def resetAccumulator(acc: MaxAccumulator[T]): Unit = {
-    acc.f0 = getInitValue
-    acc.f1 = false
-  }
-
-  override def getAccumulatorType: TypeInformation[MaxAccumulator[T]] = {
-    new TupleTypeInfo(
-      classOf[MaxAccumulator[T]],
-      getValueTypeInfo,
-      BasicTypeInfo.BOOLEAN_TYPE_INFO)
-  }
-
-  def getInitValue: T
-
-  def getValueTypeInfo: TypeInformation[_]
-}
-
-@deprecated
-class ByteMaxAggFunction extends MaxAggFunction[Byte] {
-  override def getInitValue: Byte = 0.toByte
-  override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
-}
-
-@deprecated
-class TableFunc1 extends TableFunction[String] {
-  def eval(str: String): Unit = {
-    if (str.contains("#")){
-      str.split("#").foreach(collect)
-    }
-  }
-
-  def eval(str: String, prefix: String): Unit = {
-    if (str.contains("#")) {
-      str.split("#").foreach(s => collect(prefix + s))
-    }
-  }
-}
diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala
deleted file mode 100644
index 871f6a1..0000000
--- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
-import org.apache.flink.types.Row
-
-import java.lang.{Boolean => JBool}
-
-import scala.collection.mutable
-
-/*
- * Testing utils adopted from legacy planner until the Python code is updated.
- */
-
-@deprecated
-private[flink] class TestAppendSink extends AppendStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
-    dataStream.map(
-      new MapFunction[Row, JTuple2[JBool, Row]] {
-        override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, value)
-      })
-      .addSink(new RowSink)
-  }
-
-  override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
-    val copy = new TestAppendSink
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-}
-
-@deprecated
-private[flink] class TestRetractSink extends RetractStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def consumeDataStream(s: DataStream[JTuple2[JBool, Row]]): DataStreamSink[_] = {
-    s.addSink(new RowSink)
-  }
-
-  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
-    val copy = new TestRetractSink
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-}
-
-@deprecated
-private[flink] class TestUpsertSink(
-    expectedKeys: Array[String],
-    expectedIsAppendOnly: Boolean)
-  extends UpsertStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def setKeyFields(keys: Array[String]): Unit =
-    if (keys != null) {
-      if (!expectedKeys.sorted.mkString(",").equals(keys.sorted.mkString(","))) {
-        throw new AssertionError("Provided key fields do not match expected keys")
-      }
-    } else {
-      if (expectedKeys != null) {
-        throw new AssertionError("Provided key fields should not be null.")
-      }
-    }
-
-  override def setIsAppendOnly(isAppendOnly: JBool): Unit =
-    if (expectedIsAppendOnly != isAppendOnly) {
-      throw new AssertionError("Provided isAppendOnly does not match expected isAppendOnly")
-    }
-
-  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
-  override def consumeDataStream(s: DataStream[JTuple2[JBool, Row]]): DataStreamSink[_] = {
-    s.addSink(new RowSink)
-  }
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
-    val copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly)
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-}
-
-@deprecated
-class RowSink extends SinkFunction[JTuple2[JBool, Row]] {
-  override def invoke(value: JTuple2[JBool, Row]): Unit = RowCollector.addValue(value)
-}
-
-@deprecated
-object RowCollector {
-  private val sink: mutable.ArrayBuffer[JTuple2[JBool, Row]] =
-    new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
-
-  def addValue(value: JTuple2[JBool, Row]): Unit = {
-
-    // make a copy
-    val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1))
-    sink.synchronized {
-      sink += copy
-    }
-  }
-
-  def getAndClearValues: List[JTuple2[JBool, Row]] = {
-    val out = sink.toList
-    sink.clear()
-    out
-  }
-
-  /** Converts a list of retraction messages into a list of final results. */
-  def retractResults(results: List[JTuple2[JBool, Row]]): List[String] = {
-
-    val retracted = results
-      .foldLeft(Map[String, Int]()){ (m: Map[String, Int], v: JTuple2[JBool, Row]) =>
-        val cnt = m.getOrElse(v.f1.toString, 0)
-        if (v.f0) {
-          m + (v.f1.toString -> (cnt + 1))
-        } else {
-          m + (v.f1.toString -> (cnt - 1))
-        }
-      }.filter{ case (_, c: Int) => c != 0 }
-
-    if (retracted.exists{ case (_, c: Int) => c < 0}) {
-      throw new AssertionError("Received retracted rows which have not been accumulated.")
-    }
-
-    retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList
-  }
-
-  /** Converts a list of upsert messages into a list of final results. */
-  def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
-
-    def getKeys(r: Row): Row = Row.project(r, keys)
-
-    val upserted = results.foldLeft(Map[Row, String]()){ (o: Map[Row, String], r) =>
-      val key = getKeys(r.f1)
-      if (r.f0) {
-        o + (key -> r.f1.toString)
-      } else {
-        o - key
-      }
-    }
-
-    upserted.values.toList
-  }
-}