You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2020/09/24 18:16:16 UTC

[bahir-flink] branch master updated: [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b3e1e  [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)
f0b3e1e is described below

commit f0b3e1e04930b79b277cfc7ebe3552db246578e9
Author: Gyula Fora <gy...@apache.org>
AuthorDate: Thu Sep 24 20:16:10 2020 +0200

    [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)
    
    Co-authored-by: Gyula Fora <gy...@cloudera.com>
---
 .travis.yml                                        |  4 +-
 flink-connector-flume/pom.xml                      | 21 +++--
 flink-connector-kudu/pom.xml                       | 32 ++++---
 .../kudu/connector/reader/KuduReader.java          |  2 +-
 .../kudu/connector/writer/KuduWriter.java          |  2 +-
 .../connectors/kudu/table/KuduTableFactory.java    | 55 +++---------
 .../flink/connectors/kudu/table/KuduTableSink.java |  3 -
 ...utFormatTest.java => KuduOutputFormatTest.java} |  4 +-
 .../connectors/kudu/streaming/KuduSinkTest.java    |  2 +-
 .../connectors/kudu/table/KuduCatalogTest.java     | 98 +++++++++++++---------
 .../kudu/table/KuduTableFactoryTest.java           | 89 +++++++++++---------
 .../kudu/table/KuduTableSourceITCase.java          | 16 ++--
 .../connectors/kudu/table/KuduTableSourceTest.java |  5 +-
 .../connectors/kudu/table/KuduTableTestUtils.java  |  2 +-
 .../{log4j.properties => log4j2-test.properties}   | 17 ++--
 .../netty/example/StreamSqlExample.scala           |  6 +-
 .../streaming/connectors/redis/RedisTableSink.java |  5 --
 .../connectors/redis/RedisDescriptorTest.java      |  4 +-
 pom.xml                                            |  4 +-
 19 files changed, 193 insertions(+), 178 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 96f702d..4a473f5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -34,8 +34,8 @@ jdk:
   - openjdk8
 
 env:
-  - FLINK_VERSION="1.10.0" SCALA_VERSION="2.11"
-  - FLINK_VERSION="1.10.0" SCALA_VERSION="2.12"
+  - FLINK_VERSION="1.11.2" SCALA_VERSION="2.11"
+  - FLINK_VERSION="1.11.2" SCALA_VERSION="2.12"
 
 before_install:
   - ./dev/change-scala-version.sh $SCALA_VERSION
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index d1e0f2a..984d2f9 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -53,13 +53,20 @@ under the License.
 			<version>${flume-ng.version}</version>
 		</dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests</artifactId>
-            <version>${flink.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${flink.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 		<dependency>
 			<groupId>org.testcontainers</groupId>
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index bbf168e..a76102e 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <kudu.version>1.11.1</kudu.version>
+    <kudu.version>1.13.0</kudu.version>
     <mockito.version>1.10.19</mockito.version>
   </properties>
 
@@ -79,6 +79,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-clients_${scala.binary.version}</artifactId>
+        <version>${flink.version}</version>
+        <scope>test</scope>
+    </dependency>
+
     <!-- this is added because test cluster use @Rule from junit4 -->
     <dependency>
       <groupId>org.junit.jupiter</groupId>
@@ -95,17 +102,22 @@
     </dependency>
 
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>${slf4j.version}</version>
-      <scope>runtime</scope>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <version>${log4j2.version}</version>
+    	<scope>test</scope>
     </dependency>
-
     <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <version>${log4j.version}</version>
-      <scope>runtime</scope>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>${log4j2.version}</version>
+    	<scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <version>${log4j2.version}</version>
+      <scope>test</scope>
     </dependency>
 </dependencies>
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index 51ab748..d7a0c61 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -82,7 +82,7 @@ public class KuduReader implements AutoCloseable {
         if (tableInfo.getCreateTableIfNotExists()) {
             return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
         }
-        throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+        throw new RuntimeException("Table " + tableName + " does not exist.");
     }
 
     public KuduReaderIterator scanner(byte[] token) throws IOException {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 7233478..03c37ea 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -83,7 +83,7 @@ public class KuduWriter<T> implements AutoCloseable {
         if (tableInfo.getCreateTableIfNotExists()) {
             return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
         }
-        throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+        throw new RuntimeException("Table " + tableName + " does not exist.");
     }
 
     public void write(T input) throws IOException {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index eb72205..1961aad 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.types.Row;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,25 +37,10 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.*;
+import static org.apache.flink.table.descriptors.Rowtime.*;
+import static org.apache.flink.table.descriptors.Schema.*;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
 
@@ -87,7 +71,7 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
         properties.add(SCHEMA + ".#." + SCHEMA_NAME);
         properties.add(SCHEMA + ".#." + SCHEMA_FROM);
         // computed column
-        properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+        properties.add(SCHEMA + ".#." + EXPR);
 
         // time attributes
         properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
@@ -107,9 +91,10 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
         return properties;
     }
 
-    private DescriptorProperties getValidatedProps(Map<String, String> properties) {
+    private DescriptorProperties validateTable(CatalogTable table) {
+        Map<String, String> properties = table.toProperties();
         checkNotNull(properties.get(KUDU_MASTERS), "Missing required property " + KUDU_MASTERS);
-        checkNotNull(properties.get(KUDU_TABLE), "Missing required property " + KUDU_TABLE);
+
         final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
         descriptorProperties.putProperties(properties);
         new SchemaValidator(true, false, false).validate(descriptorProperties);
@@ -117,16 +102,9 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
     }
 
     @Override
-    public KuduTableSource createTableSource(Map<String, String> properties) {
-        DescriptorProperties descriptorProperties = getValidatedProps(properties);
-        String tableName = descriptorProperties.getString(KUDU_TABLE);
-        TableSchema schema = descriptorProperties.getTableSchema(SCHEMA);
-        return createTableSource(tableName, schema, properties);
-    }
-
-    @Override
     public KuduTableSource createTableSource(ObjectPath tablePath, CatalogTable table) {
-        String tableName = tablePath.getObjectName();
+        validateTable(table);
+        String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
         return createTableSource(tableName, table.getSchema(), table.getProperties());
     }
 
@@ -142,17 +120,10 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
     }
 
     @Override
-    public KuduTableSink createTableSink(Map<String, String> properties) {
-        DescriptorProperties descriptorProperties = getValidatedProps(properties);
-        String tableName = descriptorProperties.getString(KUDU_TABLE);
-        TableSchema schema = descriptorProperties.getTableSchema(SCHEMA);
-
-        return createTableSink(tableName, schema, properties);
-    }
-
-    @Override
     public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) {
-        return createTableSink(tablePath.getObjectName(), table.getSchema(), table.getProperties());
+        validateTable(table);
+        String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
+        return createTableSink(tableName, table.getSchema(), table.getProperties());
     }
 
     private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
index 8ff517a..99325c0 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
@@ -52,9 +52,6 @@ public class KuduTableSink implements UpsertStreamTableSink<Row> {
     public TypeInformation<Row> getRecordType() { return flinkSchema.toRowType(); }
 
     @Override
-    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) { consumeDataStream(dataStreamTuple); }
-
-    @Override
     public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) {
         KuduSink upsertKuduSink = new KuduSink(writerConfigBuilder.build(), tableInfo, new UpsertOperationMapper(getTableSchema().getFieldNames()));
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
similarity index 96%
rename from flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
index abf8a30..22fa0a4 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 import java.util.UUID;
 
-class KuduOuputFormatTest extends KuduTestBase {
+class KuduOutputFormatTest extends KuduTestBase {
 
     @Test
     void testInvalidKuduMaster() {
@@ -50,7 +50,7 @@ class KuduOuputFormatTest extends KuduTestBase {
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
         KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
-        Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0, 1));
+        Assertions.assertThrows(RuntimeException.class, () -> outputFormat.open(0, 1));
     }
 
     @Test
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 077306d..4d74fda 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -73,7 +73,7 @@ public class KuduSinkTest extends KuduTestBase {
         KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
 
         sink.setRuntimeContext(context);
-        Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
+        Assertions.assertThrows(RuntimeException.class, () -> sink.open(new Configuration()));
     }
 
     @Test
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index 3cef2d8..4bb1871 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 
@@ -52,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.jupiter.api.Assertions.*;
@@ -72,78 +73,89 @@ public class KuduCatalogTest extends KuduTestBase {
 
     @Test
     public void testCreateAlterDrop() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
-        tableEnv.sqlUpdate("INSERT INTO TestTable1 VALUES ('f', 's')");
+        tableEnv.executeSql("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+        tableEnv.executeSql("INSERT INTO TestTable1 VALUES ('f', 's')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         // Add this once Primary key support has been enabled
         // tableEnv.sqlUpdate("CREATE TABLE TestTable2 (`first` STRING, `second` String, PRIMARY KEY(`first`)) WITH ('kudu.hash-columns' = 'first')");
         // tableEnv.sqlUpdate("INSERT INTO TestTable2 VALUES ('f', 's')");
 
-        tableEnv.execute("test");
 
         validateSingleKey("TestTable1");
         // validateSingleKey("TestTable2");
 
-        tableEnv.sqlUpdate("ALTER TABLE TestTable1 RENAME TO TestTable1R");
+        tableEnv.executeSql("ALTER TABLE TestTable1 RENAME TO TestTable1R");
         validateSingleKey("TestTable1R");
 
-        tableEnv.sqlUpdate("DROP TABLE TestTable1R");
+        tableEnv.executeSql("DROP TABLE TestTable1R");
         assertFalse(harness.getClient().tableExists("TestTable1R"));
     }
 
     @Test
     public void testCreateAndInsertMultiKey() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')");
-        tableEnv.sqlUpdate("INSERT INTO TestTable3 VALUES ('f', 2, 't')");
-
-        // Add this once Primary key support has been enabled
-        // tableEnv.sqlUpdate("CREATE TABLE TestTable4 (`first` STRING, `second` INT, `third` STRING) PRIMARY KEY (`first`, `second`) WITH ('kudu.hash-columns' = 'first,second')");
-        // tableEnv.sqlUpdate("INSERT INTO TestTable4 VALUES ('f', 2, 't')");
-
-        tableEnv.execute("test");
+        tableEnv.executeSql("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')");
+        tableEnv.executeSql("INSERT INTO TestTable3 VALUES ('f', 2, 't')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         validateMultiKey("TestTable3");
-        // validateMultiKey("TestTable4");
     }
 
     @Test
     public void testSourceProjection() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')");
-        tableEnv.sqlUpdate("INSERT INTO TestTable5 VALUES ('s', 'f', 't')");
-        tableEnv.execute("test");
-
-        tableEnv.sqlUpdate("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
-        tableEnv.sqlUpdate("INSERT INTO TestTable6 (SELECT `first`, `second` FROM  TestTable5)");
-        tableEnv.execute("test");
+        tableEnv.executeSql("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')");
+        tableEnv.executeSql("INSERT INTO TestTable5 VALUES ('s', 'f', 't')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
+
+        tableEnv.executeSql("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+        tableEnv.executeSql("INSERT INTO TestTable6 (SELECT `first`, `second` FROM  TestTable5)")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         validateSingleKey("TestTable6");
     }
 
     @Test
     public void testEmptyProjection() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
-        tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f','s')");
-        tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f2','s2')");
-        tableEnv.execute("test");
+        CollectionSink.output.clear();
+        tableEnv.executeSql("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+        tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f','s')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
+        tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f2','s2')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         Table result = tableEnv.sqlQuery("SELECT COUNT(*) FROM TestTableEP");
 
         DataStream<Tuple2<Boolean, Row>> resultDataStream = tableEnv.toRetractStream(result, Types.ROW(Types.LONG));
 
-        CollectionSink.output.clear();
-
         resultDataStream
                 .map(t -> Tuple2.of(t.f0, t.f1.getField(0)))
                 .returns(Types.TUPLE(Types.BOOLEAN, Types.LONG))
                 .addSink(new CollectionSink<>()).setParallelism(1);
 
-        tableEnv.execute("test");
+        resultDataStream.getExecutionEnvironment().execute();
 
         List<Tuple2<Boolean, Long>> expected = Lists.newArrayList(Tuple2.of(true, 1L), Tuple2.of(false, 1L), Tuple2.of(true, 2L));
 
         assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output));
         CollectionSink.output.clear();
-
     }
 
     @Test
@@ -208,10 +220,13 @@ public class KuduCatalogTest extends KuduTestBase {
 
     @Test
     public void testTimestamp() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " +
+        tableEnv.executeSql("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " +
                 "WITH ('kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
-        tableEnv.sqlUpdate("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
-        tableEnv.execute("test");
+        tableEnv.executeSql("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         KuduTable kuduTable = harness.getClient().openTable("TestTableTsC");
         assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
@@ -227,29 +242,32 @@ public class KuduCatalogTest extends KuduTestBase {
 
     @Test
     public void testDatatypes() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," +
+        tableEnv.executeSql("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," +
                 "`fourth` TINYINT, `fifth` SMALLINT, `sixth` INT, `seventh` BIGINT, `eighth` FLOAT, `ninth` DOUBLE, " +
                 "`tenth` TIMESTAMP)" +
                 "WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
 
-        tableEnv.sqlUpdate("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," +
+        tableEnv.executeSql("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," +
                 "cast(34 as SMALLINT), 56, cast(78 as BIGINT), cast(3.14 as FLOAT), cast(1.2345 as DOUBLE)," +
-                "TIMESTAMP '2020-04-15 12:34:56.123') ");
+                "TIMESTAMP '2020-04-15 12:34:56.123') ")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
-        tableEnv.execute("test");
         validateManyTypes("TestTable8");
     }
 
     @Test
     public void testMissingPropertiesCatalog() throws Exception {
         assertThrows(TableException.class,
-                () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9a (`first` STRING, `second` String) " +
+                () -> tableEnv.executeSql("CREATE TABLE TestTable9a (`first` STRING, `second` String) " +
                         "WITH ('kudu.primary-key-columns' = 'second')"));
         assertThrows(TableException.class,
-                () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
+                () -> tableEnv.executeSql("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
                         "WITH ('kudu.hash-columns' = 'first')"));
         assertThrows(TableException.class,
-                () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
+                () -> tableEnv.executeSql("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
                         "WITH ('kudu.primary-key-columns' = 'second', 'kudu.hash-columns' = 'first')"));
     }
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index 3a2f776..d852f8e 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -17,8 +17,11 @@
 package org.apache.flink.connectors.kudu.table;
 
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.KuduTable;
@@ -31,9 +34,10 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.*;
 
 public class KuduTableFactoryTest extends KuduTestBase {
     private StreamTableEnvironment tableEnv;
@@ -47,56 +51,59 @@ public class KuduTableFactoryTest extends KuduTestBase {
     }
 
     @Test
-    public void testMissingTable() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
-                "WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "')");
-        assertThrows(NullPointerException.class,
-                () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
-    }
-
-    @Test
     public void testMissingMasters() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+        tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
                 "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11')");
-        assertThrows(NullPointerException.class,
-                () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
+        assertThrows(TableException.class,
+                () -> tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 1)"));
     }
 
     @Test
     public void testNonExistingTable() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+        tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
                 "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "')");
-        tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)");
-        assertThrows(java.util.concurrent.ExecutionException.class,
-                () -> tableEnv.execute("test"));
+        JobClient jobClient = tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 1)").getJobClient().get();
+        try {
+            jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+            fail();
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof JobExecutionException);
+        }
     }
 
     @Test
     public void testCreateTable() throws Exception {
-        tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " +
+        tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " +
                 "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "', " +
                 "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
-        tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 's')");
-        tableEnv.execute("test");
+
+        tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 's')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         validateSingleKey("TestTable11");
     }
+
     @Test
     public void testTimestamp() throws Exception {
         // Timestamp should be bridged to sql.Timestamp
         // Test it when creating the table...
-        tableEnv.sqlUpdate("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " +
-                "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "', " +
+        tableEnv.executeSql("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " +
+                "WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "', " +
                 "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
-        tableEnv.sqlUpdate("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
-        tableEnv.execute("test");
-
-        // And also when inserting into existing table
-        tableEnv.sqlUpdate("CREATE TABLE TestTableTsE (`first` STRING, `second` TIMESTAMP(3)) " +
-                "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "')");
-
-        tableEnv.sqlUpdate("INSERT INTO TestTableTsE values ('s', TIMESTAMP '2020-02-02 23:23:23')");
-        tableEnv.execute("test");
+        tableEnv.executeSql("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
+
+        tableEnv.executeSql("INSERT INTO TestTableTs values ('s', TIMESTAMP '2020-02-02 23:23:23')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         KuduTable kuduTable = harness.getClient().openTable("TestTableTs");
         assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
@@ -115,18 +122,24 @@ public class KuduTableFactoryTest extends KuduTestBase {
     @Test
     public void testExistingTable() throws Exception {
         // Creating a table
-        tableEnv.sqlUpdate("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " +
+        tableEnv.executeSql("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " +
                 "WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "', " +
                 "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
 
-        tableEnv.sqlUpdate("INSERT INTO TestTable12 values ('f', 's')");
-        tableEnv.execute("test");
+        tableEnv.executeSql("INSERT INTO TestTable12 values ('f', 's')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         // Then another one in SQL that refers to the previously created one
-        tableEnv.sqlUpdate("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " +
+        tableEnv.executeSql("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " +
                 "WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "')");
-        tableEnv.sqlUpdate("INSERT INTO TestTable12b values ('f2','s2')");
-        tableEnv.execute("test2");
+        tableEnv.executeSql("INSERT INTO TestTable12b values ('f2','s2')")
+                .getJobClient()
+                .get()
+                .getJobExecutionResult(getClass().getClassLoader())
+                .get(1, TimeUnit.MINUTES);
 
         // Validate that both insertions were into the same table
         KuduTable kuduTable = harness.getClient().openTable("TestTable12");
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
index f5939fc..b83497c 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -18,16 +18,16 @@ package org.apache.flink.connectors.kudu.table;
 
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
-import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Integration tests for {@link KuduTableSource}.
@@ -48,8 +48,9 @@ public class KuduTableSourceITCase extends KuduTestBase {
 
     @Test
     void testFullBatchScan() throws Exception {
-        Table query = tableEnv.sqlQuery("select * from books order by id");
-        List<Row> results = TableUtils.collectToList(query);
+        CloseableIterator<Row> it = tableEnv.executeSql("select * from books order by id").collect();
+        List<Row> results = new ArrayList<>();
+        it.forEachRemaining(results::add);
         assertEquals(5, results.size());
         assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", results.get(0).toString());
         tableEnv.sqlUpdate("DROP TABLE books");
@@ -58,8 +59,9 @@ public class KuduTableSourceITCase extends KuduTestBase {
     @Test
     void testScanWithProjectionAndFilter() throws Exception {
         // (price > 30 and price < 40)
-        Table table = tableEnv.sqlQuery("SELECT title FROM books WHERE id IN (1003, 1004) and quantity < 40");
-        List<Row> results = TableUtils.collectToList(table);
+        CloseableIterator<Row> it = tableEnv.executeSql("SELECT title FROM books WHERE id IN (1003, 1004) and quantity < 40").collect();
+        List<Row> results = new ArrayList<>();
+        it.forEachRemaining(results::add);
         assertEquals(1, results.size());
         assertEquals("More Java for more dummies", results.get(0).toString());
         tableEnv.sqlUpdate("DROP TABLE books");
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
index f4bb6ae..2dfb71b 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
@@ -121,8 +121,7 @@ public class KuduTableSourceTest extends KuduTestBase {
         // expressions for supported predicates
         FieldReferenceExpression fieldReferenceExpression = new FieldReferenceExpression(
             "id", DataTypes.INT(), 0, 0);
-        ValueLiteralExpression valueLiteralExpression = new ValueLiteralExpression(
-            1, DataTypes.INT());
+        ValueLiteralExpression valueLiteralExpression = new ValueLiteralExpression(1);
         List<ResolvedExpression> args = new ArrayList<>(
             Arrays.asList(fieldReferenceExpression, valueLiteralExpression));
         Expression supportedPred = new CallExpression(
@@ -132,7 +131,7 @@ public class KuduTableSourceTest extends KuduTestBase {
         // unsupported predicate
         Expression unsupportedPred = new CallExpression(
             new ScalarFunctionDefinition("dummy", DUMMY_FUNCTION),
-            singletonList(new ValueLiteralExpression(1, DataTypes.INT())),
+            singletonList(new ValueLiteralExpression(1)),
             DataTypes.INT());
         // invalid predicate
         Expression invalidPred = new CallExpression(
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
index 54854fb..4eae7bf 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
@@ -19,7 +19,7 @@ package org.apache.flink.connectors.kudu.table;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
 
diff --git a/flink-connector-kudu/src/test/resources/log4j.properties b/flink-connector-kudu/src/test/resources/log4j2-test.properties
similarity index 70%
rename from flink-connector-kudu/src/test/resources/log4j.properties
rename to flink-connector-kudu/src/test/resources/log4j2-test.properties
index 15efe08..e463a0e 100644
--- a/flink-connector-kudu/src/test/resources/log4j.properties
+++ b/flink-connector-kudu/src/test/resources/log4j2-test.properties
@@ -16,12 +16,13 @@
 # limitations under the License.
 ################################################################################
 
-# This file ensures that tests executed from the IDE show log output
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO
+rootLogger.appenderRef.test.ref = TestLogger
 
-log4j.rootLogger=WARN, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
index 9f4d0be..08b5068 100644
--- a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.netty.example
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.bridge.scala._
 
 /**
  * Simple example for demonstrating the use of SQL on a Stream Table.
@@ -55,8 +55,8 @@ object StreamSqlExample {
         val tk = line.split(",")
         Order(tk.head.trim.toLong, tk(1), tk(2).trim.toInt)
       }
-    // register the DataStreams under the name "OrderA" and "OrderB"
-    tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
+
+    tEnv.createTemporaryView("OrderA", orderA)
 
     // union the two tables
     val result = tEnv.sqlQuery("SELECT STREAM * FROM OrderA WHERE amount > 2")
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
index 8f1571b..59ab911 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
@@ -74,11 +74,6 @@ public class RedisTableSink implements UpsertStreamTableSink<Row> {
 
 
     @Override
-    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
-        consumeDataStream(dataStream);
-    }
-
-    @Override
     public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) {
         return new RedisTableSink(getProperties());
     }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
index 97293d0..9b52d6a 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
 import org.apache.flink.streaming.connectors.redis.descriptor.Redis;
 import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
 import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.types.Row;
 import org.junit.Before;
@@ -66,7 +66,7 @@ public class RedisDescriptorTest extends  RedisITCaseBase{
                 .connect(redis).withSchema(new Schema()
                 .field("k", TypeInformation.of(String.class))
                 .field("v", TypeInformation.of(Long.class)))
-                .registerTableSink("redis");
+                .createTemporaryTable("redis");
 
 
         tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
diff --git a/pom.xml b/pom.xml
index 126e79d..109b528 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,9 +93,10 @@
 
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
+    <log4j2.version>2.13.3</log4j2.version>
 
     <!-- Flink version -->
-    <flink.version>1.10.0</flink.version>
+    <flink.version>1.11.2</flink.version>
 
     <junit.jupiter.version>5.4.1</junit.jupiter.version>
 
@@ -784,4 +785,3 @@
 
   </profiles>
 </project>
-