You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2020/09/24 18:16:16 UTC
[bahir-flink] branch master updated: [BAHIR-241] Upgrade all
connectors to Flink 1.11 (#99)
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new f0b3e1e [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)
f0b3e1e is described below
commit f0b3e1e04930b79b277cfc7ebe3552db246578e9
Author: Gyula Fora <gy...@apache.org>
AuthorDate: Thu Sep 24 20:16:10 2020 +0200
[BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)
Co-authored-by: Gyula Fora <gy...@cloudera.com>
---
.travis.yml | 4 +-
flink-connector-flume/pom.xml | 21 +++--
flink-connector-kudu/pom.xml | 32 ++++---
.../kudu/connector/reader/KuduReader.java | 2 +-
.../kudu/connector/writer/KuduWriter.java | 2 +-
.../connectors/kudu/table/KuduTableFactory.java | 55 +++---------
.../flink/connectors/kudu/table/KuduTableSink.java | 3 -
...utFormatTest.java => KuduOutputFormatTest.java} | 4 +-
.../connectors/kudu/streaming/KuduSinkTest.java | 2 +-
.../connectors/kudu/table/KuduCatalogTest.java | 98 +++++++++++++---------
.../kudu/table/KuduTableFactoryTest.java | 89 +++++++++++---------
.../kudu/table/KuduTableSourceITCase.java | 16 ++--
.../connectors/kudu/table/KuduTableSourceTest.java | 5 +-
.../connectors/kudu/table/KuduTableTestUtils.java | 2 +-
.../{log4j.properties => log4j2-test.properties} | 17 ++--
.../netty/example/StreamSqlExample.scala | 6 +-
.../streaming/connectors/redis/RedisTableSink.java | 5 --
.../connectors/redis/RedisDescriptorTest.java | 4 +-
pom.xml | 4 +-
19 files changed, 193 insertions(+), 178 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 96f702d..4a473f5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -34,8 +34,8 @@ jdk:
- openjdk8
env:
- - FLINK_VERSION="1.10.0" SCALA_VERSION="2.11"
- - FLINK_VERSION="1.10.0" SCALA_VERSION="2.12"
+ - FLINK_VERSION="1.11.2" SCALA_VERSION="2.11"
+ - FLINK_VERSION="1.11.2" SCALA_VERSION="2.12"
before_install:
- ./dev/change-scala-version.sh $SCALA_VERSION
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index d1e0f2a..984d2f9 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -53,13 +53,20 @@ under the License.
<version>${flume-ng.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${flink.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index bbf168e..a76102e 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
<packaging>jar</packaging>
<properties>
- <kudu.version>1.11.1</kudu.version>
+ <kudu.version>1.13.0</kudu.version>
<mockito.version>1.10.19</mockito.version>
</properties>
@@ -79,6 +79,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- this is added because test cluster use @Rule from junit4 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
@@ -95,17 +102,22 @@
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- <scope>runtime</scope>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
</dependency>
-
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>${log4j.version}</version>
- <scope>runtime</scope>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
</dependency>
</dependencies>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index 51ab748..d7a0c61 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -82,7 +82,7 @@ public class KuduReader implements AutoCloseable {
if (tableInfo.getCreateTableIfNotExists()) {
return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
}
- throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+ throw new RuntimeException("Table " + tableName + " does not exist.");
}
public KuduReaderIterator scanner(byte[] token) throws IOException {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 7233478..03c37ea 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -83,7 +83,7 @@ public class KuduWriter<T> implements AutoCloseable {
if (tableInfo.getCreateTableIfNotExists()) {
return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
}
- throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+ throw new RuntimeException("Table " + tableName + " does not exist.");
}
public void write(T input) throws IOException {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index eb72205..1961aad 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.types.Row;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,25 +37,10 @@ import java.util.List;
import java.util.Map;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.*;
+import static org.apache.flink.table.descriptors.Rowtime.*;
+import static org.apache.flink.table.descriptors.Schema.*;
+import static org.apache.flink.util.Preconditions.checkNotNull;
public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
@@ -87,7 +71,7 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
properties.add(SCHEMA + ".#." + SCHEMA_FROM);
// computed column
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + EXPR);
// time attributes
properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
@@ -107,9 +91,10 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
return properties;
}
- private DescriptorProperties getValidatedProps(Map<String, String> properties) {
+ private DescriptorProperties validateTable(CatalogTable table) {
+ Map<String, String> properties = table.toProperties();
checkNotNull(properties.get(KUDU_MASTERS), "Missing required property " + KUDU_MASTERS);
- checkNotNull(properties.get(KUDU_TABLE), "Missing required property " + KUDU_TABLE);
+
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
descriptorProperties.putProperties(properties);
new SchemaValidator(true, false, false).validate(descriptorProperties);
@@ -117,16 +102,9 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
}
@Override
- public KuduTableSource createTableSource(Map<String, String> properties) {
- DescriptorProperties descriptorProperties = getValidatedProps(properties);
- String tableName = descriptorProperties.getString(KUDU_TABLE);
- TableSchema schema = descriptorProperties.getTableSchema(SCHEMA);
- return createTableSource(tableName, schema, properties);
- }
-
- @Override
public KuduTableSource createTableSource(ObjectPath tablePath, CatalogTable table) {
- String tableName = tablePath.getObjectName();
+ validateTable(table);
+ String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
return createTableSource(tableName, table.getSchema(), table.getProperties());
}
@@ -142,17 +120,10 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
}
@Override
- public KuduTableSink createTableSink(Map<String, String> properties) {
- DescriptorProperties descriptorProperties = getValidatedProps(properties);
- String tableName = descriptorProperties.getString(KUDU_TABLE);
- TableSchema schema = descriptorProperties.getTableSchema(SCHEMA);
-
- return createTableSink(tableName, schema, properties);
- }
-
- @Override
public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) {
- return createTableSink(tablePath.getObjectName(), table.getSchema(), table.getProperties());
+ validateTable(table);
+ String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
+ return createTableSink(tableName, table.getSchema(), table.getProperties());
}
private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
index 8ff517a..99325c0 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
@@ -52,9 +52,6 @@ public class KuduTableSink implements UpsertStreamTableSink<Row> {
public TypeInformation<Row> getRecordType() { return flinkSchema.toRowType(); }
@Override
- public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) { consumeDataStream(dataStreamTuple); }
-
- @Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) {
KuduSink upsertKuduSink = new KuduSink(writerConfigBuilder.build(), tableInfo, new UpsertOperationMapper(getTableSchema().getFieldNames()));
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
similarity index 96%
rename from flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
index abf8a30..22fa0a4 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.UUID;
-class KuduOuputFormatTest extends KuduTestBase {
+class KuduOutputFormatTest extends KuduTestBase {
@Test
void testInvalidKuduMaster() {
@@ -50,7 +50,7 @@ class KuduOuputFormatTest extends KuduTestBase {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
- Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0, 1));
+ Assertions.assertThrows(RuntimeException.class, () -> outputFormat.open(0, 1));
}
@Test
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 077306d..4d74fda 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -73,7 +73,7 @@ public class KuduSinkTest extends KuduTestBase {
KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
sink.setRuntimeContext(context);
- Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
+ Assertions.assertThrows(RuntimeException.class, () -> sink.open(new Configuration()));
}
@Test
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index 3cef2d8..4bb1871 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
@@ -52,6 +52,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.*;
@@ -72,78 +73,89 @@ public class KuduCatalogTest extends KuduTestBase {
@Test
public void testCreateAlterDrop() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
- tableEnv.sqlUpdate("INSERT INTO TestTable1 VALUES ('f', 's')");
+ tableEnv.executeSql("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+ tableEnv.executeSql("INSERT INTO TestTable1 VALUES ('f', 's')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
// Add this once Primary key support has been enabled
// tableEnv.sqlUpdate("CREATE TABLE TestTable2 (`first` STRING, `second` String, PRIMARY KEY(`first`)) WITH ('kudu.hash-columns' = 'first')");
// tableEnv.sqlUpdate("INSERT INTO TestTable2 VALUES ('f', 's')");
- tableEnv.execute("test");
validateSingleKey("TestTable1");
// validateSingleKey("TestTable2");
- tableEnv.sqlUpdate("ALTER TABLE TestTable1 RENAME TO TestTable1R");
+ tableEnv.executeSql("ALTER TABLE TestTable1 RENAME TO TestTable1R");
validateSingleKey("TestTable1R");
- tableEnv.sqlUpdate("DROP TABLE TestTable1R");
+ tableEnv.executeSql("DROP TABLE TestTable1R");
assertFalse(harness.getClient().tableExists("TestTable1R"));
}
@Test
public void testCreateAndInsertMultiKey() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')");
- tableEnv.sqlUpdate("INSERT INTO TestTable3 VALUES ('f', 2, 't')");
-
- // Add this once Primary key support has been enabled
- // tableEnv.sqlUpdate("CREATE TABLE TestTable4 (`first` STRING, `second` INT, `third` STRING) PRIMARY KEY (`first`, `second`) WITH ('kudu.hash-columns' = 'first,second')");
- // tableEnv.sqlUpdate("INSERT INTO TestTable4 VALUES ('f', 2, 't')");
-
- tableEnv.execute("test");
+ tableEnv.executeSql("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')");
+ tableEnv.executeSql("INSERT INTO TestTable3 VALUES ('f', 2, 't')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
validateMultiKey("TestTable3");
- // validateMultiKey("TestTable4");
}
@Test
public void testSourceProjection() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')");
- tableEnv.sqlUpdate("INSERT INTO TestTable5 VALUES ('s', 'f', 't')");
- tableEnv.execute("test");
-
- tableEnv.sqlUpdate("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
- tableEnv.sqlUpdate("INSERT INTO TestTable6 (SELECT `first`, `second` FROM TestTable5)");
- tableEnv.execute("test");
+ tableEnv.executeSql("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')");
+ tableEnv.executeSql("INSERT INTO TestTable5 VALUES ('s', 'f', 't')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
+
+ tableEnv.executeSql("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+ tableEnv.executeSql("INSERT INTO TestTable6 (SELECT `first`, `second` FROM TestTable5)")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
validateSingleKey("TestTable6");
}
@Test
public void testEmptyProjection() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
- tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f','s')");
- tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f2','s2')");
- tableEnv.execute("test");
+ CollectionSink.output.clear();
+ tableEnv.executeSql("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+ tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f','s')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
+ tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f2','s2')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
Table result = tableEnv.sqlQuery("SELECT COUNT(*) FROM TestTableEP");
DataStream<Tuple2<Boolean, Row>> resultDataStream = tableEnv.toRetractStream(result, Types.ROW(Types.LONG));
- CollectionSink.output.clear();
-
resultDataStream
.map(t -> Tuple2.of(t.f0, t.f1.getField(0)))
.returns(Types.TUPLE(Types.BOOLEAN, Types.LONG))
.addSink(new CollectionSink<>()).setParallelism(1);
- tableEnv.execute("test");
+ resultDataStream.getExecutionEnvironment().execute();
List<Tuple2<Boolean, Long>> expected = Lists.newArrayList(Tuple2.of(true, 1L), Tuple2.of(false, 1L), Tuple2.of(true, 2L));
assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output));
CollectionSink.output.clear();
-
}
@Test
@@ -208,10 +220,13 @@ public class KuduCatalogTest extends KuduTestBase {
@Test
public void testTimestamp() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " +
+ tableEnv.executeSql("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " +
"WITH ('kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
- tableEnv.sqlUpdate("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
- tableEnv.execute("test");
+ tableEnv.executeSql("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
KuduTable kuduTable = harness.getClient().openTable("TestTableTsC");
assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
@@ -227,29 +242,32 @@ public class KuduCatalogTest extends KuduTestBase {
@Test
public void testDatatypes() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," +
+ tableEnv.executeSql("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," +
"`fourth` TINYINT, `fifth` SMALLINT, `sixth` INT, `seventh` BIGINT, `eighth` FLOAT, `ninth` DOUBLE, " +
"`tenth` TIMESTAMP)" +
"WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
- tableEnv.sqlUpdate("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," +
+ tableEnv.executeSql("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," +
"cast(34 as SMALLINT), 56, cast(78 as BIGINT), cast(3.14 as FLOAT), cast(1.2345 as DOUBLE)," +
- "TIMESTAMP '2020-04-15 12:34:56.123') ");
+ "TIMESTAMP '2020-04-15 12:34:56.123') ")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
- tableEnv.execute("test");
validateManyTypes("TestTable8");
}
@Test
public void testMissingPropertiesCatalog() throws Exception {
assertThrows(TableException.class,
- () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9a (`first` STRING, `second` String) " +
+ () -> tableEnv.executeSql("CREATE TABLE TestTable9a (`first` STRING, `second` String) " +
"WITH ('kudu.primary-key-columns' = 'second')"));
assertThrows(TableException.class,
- () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
+ () -> tableEnv.executeSql("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
"WITH ('kudu.hash-columns' = 'first')"));
assertThrows(TableException.class,
- () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
+ () -> tableEnv.executeSql("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
"WITH ('kudu.primary-key-columns' = 'second', 'kudu.hash-columns' = 'first')"));
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index 3a2f776..d852f8e 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -17,8 +17,11 @@
package org.apache.flink.connectors.kudu.table;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
@@ -31,9 +34,10 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.*;
public class KuduTableFactoryTest extends KuduTestBase {
private StreamTableEnvironment tableEnv;
@@ -47,56 +51,59 @@ public class KuduTableFactoryTest extends KuduTestBase {
}
@Test
- public void testMissingTable() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
- "WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "')");
- assertThrows(NullPointerException.class,
- () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
- }
-
- @Test
public void testMissingMasters() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+ tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11')");
- assertThrows(NullPointerException.class,
- () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
+ assertThrows(TableException.class,
+ () -> tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 1)"));
}
@Test
public void testNonExistingTable() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+ tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "')");
- tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)");
- assertThrows(java.util.concurrent.ExecutionException.class,
- () -> tableEnv.execute("test"));
+ JobClient jobClient = tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 1)").getJobClient().get();
+ try {
+ jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+ fail();
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof JobExecutionException);
+ }
}
@Test
public void testCreateTable() throws Exception {
- tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " +
+ tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
- tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 's')");
- tableEnv.execute("test");
+
+ tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 's')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
validateSingleKey("TestTable11");
}
+
@Test
public void testTimestamp() throws Exception {
// Timestamp should be bridged to sql.Timestamp
// Test it when creating the table...
- tableEnv.sqlUpdate("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " +
- "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "', " +
+ tableEnv.executeSql("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " +
+ "WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
- tableEnv.sqlUpdate("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
- tableEnv.execute("test");
-
- // And also when inserting into existing table
- tableEnv.sqlUpdate("CREATE TABLE TestTableTsE (`first` STRING, `second` TIMESTAMP(3)) " +
- "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "')");
-
- tableEnv.sqlUpdate("INSERT INTO TestTableTsE values ('s', TIMESTAMP '2020-02-02 23:23:23')");
- tableEnv.execute("test");
+ tableEnv.executeSql("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
+
+ tableEnv.executeSql("INSERT INTO TestTableTs values ('s', TIMESTAMP '2020-02-02 23:23:23')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
KuduTable kuduTable = harness.getClient().openTable("TestTableTs");
assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
@@ -115,18 +122,24 @@ public class KuduTableFactoryTest extends KuduTestBase {
@Test
public void testExistingTable() throws Exception {
// Creating a table
- tableEnv.sqlUpdate("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " +
+ tableEnv.executeSql("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
- tableEnv.sqlUpdate("INSERT INTO TestTable12 values ('f', 's')");
- tableEnv.execute("test");
+ tableEnv.executeSql("INSERT INTO TestTable12 values ('f', 's')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
// Then another one in SQL that refers to the previously created one
- tableEnv.sqlUpdate("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " +
+ tableEnv.executeSql("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "')");
- tableEnv.sqlUpdate("INSERT INTO TestTable12b values ('f2','s2')");
- tableEnv.execute("test2");
+ tableEnv.executeSql("INSERT INTO TestTable12b values ('f2','s2')")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(getClass().getClassLoader())
+ .get(1, TimeUnit.MINUTES);
// Validate that both insertions were into the same table
KuduTable kuduTable = harness.getClient().openTable("TestTable12");
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
index f5939fc..b83497c 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -18,16 +18,16 @@ package org.apache.flink.connectors.kudu.table;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
-import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableUtils;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Integration tests for {@link KuduTableSource}.
@@ -48,8 +48,9 @@ public class KuduTableSourceITCase extends KuduTestBase {
@Test
void testFullBatchScan() throws Exception {
- Table query = tableEnv.sqlQuery("select * from books order by id");
- List<Row> results = TableUtils.collectToList(query);
+ CloseableIterator<Row> it = tableEnv.executeSql("select * from books order by id").collect();
+ List<Row> results = new ArrayList<>();
+ it.forEachRemaining(results::add);
assertEquals(5, results.size());
assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", results.get(0).toString());
tableEnv.sqlUpdate("DROP TABLE books");
@@ -58,8 +59,9 @@ public class KuduTableSourceITCase extends KuduTestBase {
@Test
void testScanWithProjectionAndFilter() throws Exception {
// (price > 30 and price < 40)
- Table table = tableEnv.sqlQuery("SELECT title FROM books WHERE id IN (1003, 1004) and quantity < 40");
- List<Row> results = TableUtils.collectToList(table);
+ CloseableIterator<Row> it = tableEnv.executeSql("SELECT title FROM books WHERE id IN (1003, 1004) and quantity < 40").collect();
+ List<Row> results = new ArrayList<>();
+ it.forEachRemaining(results::add);
assertEquals(1, results.size());
assertEquals("More Java for more dummies", results.get(0).toString());
tableEnv.sqlUpdate("DROP TABLE books");
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
index f4bb6ae..2dfb71b 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
@@ -121,8 +121,7 @@ public class KuduTableSourceTest extends KuduTestBase {
// expressions for supported predicates
FieldReferenceExpression fieldReferenceExpression = new FieldReferenceExpression(
"id", DataTypes.INT(), 0, 0);
- ValueLiteralExpression valueLiteralExpression = new ValueLiteralExpression(
- 1, DataTypes.INT());
+ ValueLiteralExpression valueLiteralExpression = new ValueLiteralExpression(1);
List<ResolvedExpression> args = new ArrayList<>(
Arrays.asList(fieldReferenceExpression, valueLiteralExpression));
Expression supportedPred = new CallExpression(
@@ -132,7 +131,7 @@ public class KuduTableSourceTest extends KuduTestBase {
// unsupported predicate
Expression unsupportedPred = new CallExpression(
new ScalarFunctionDefinition("dummy", DUMMY_FUNCTION),
- singletonList(new ValueLiteralExpression(1, DataTypes.INT())),
+ singletonList(new ValueLiteralExpression(1)),
DataTypes.INT());
// invalid predicate
Expression invalidPred = new CallExpression(
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
index 54854fb..4eae7bf 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
@@ -19,7 +19,7 @@ package org.apache.flink.connectors.kudu.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
diff --git a/flink-connector-kudu/src/test/resources/log4j.properties b/flink-connector-kudu/src/test/resources/log4j2-test.properties
similarity index 70%
rename from flink-connector-kudu/src/test/resources/log4j.properties
rename to flink-connector-kudu/src/test/resources/log4j2-test.properties
index 15efe08..e463a0e 100644
--- a/flink-connector-kudu/src/test/resources/log4j.properties
+++ b/flink-connector-kudu/src/test/resources/log4j2-test.properties
@@ -16,12 +16,13 @@
# limitations under the License.
################################################################################
-# This file ensures that tests executed from the IDE show log output
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO
+rootLogger.appenderRef.test.ref = TestLogger
-log4j.rootLogger=WARN, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
index 9f4d0be..08b5068 100644
--- a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.netty.example
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.bridge.scala._
/**
* Simple example for demonstrating the use of SQL on a Stream Table.
@@ -55,8 +55,8 @@ object StreamSqlExample {
val tk = line.split(",")
Order(tk.head.trim.toLong, tk(1), tk(2).trim.toInt)
}
- // register the DataStreams under the name "OrderA" and "OrderB"
- tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
+
+ tEnv.createTemporaryView("OrderA", orderA)
// union the two tables
val result = tEnv.sqlQuery("SELECT STREAM * FROM OrderA WHERE amount > 2")
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
index 8f1571b..59ab911 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
@@ -74,11 +74,6 @@ public class RedisTableSink implements UpsertStreamTableSink<Row> {
@Override
- public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
- consumeDataStream(dataStream);
- }
-
- @Override
public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) {
return new RedisTableSink(getProperties());
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
index 97293d0..9b52d6a 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.descriptor.Redis;
import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import org.junit.Before;
@@ -66,7 +66,7 @@ public class RedisDescriptorTest extends RedisITCaseBase{
.connect(redis).withSchema(new Schema()
.field("k", TypeInformation.of(String.class))
.field("v", TypeInformation.of(Long.class)))
- .registerTableSink("redis");
+ .createTemporaryTable("redis");
tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
diff --git a/pom.xml b/pom.xml
index 126e79d..109b528 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,9 +93,10 @@
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
+ <log4j2.version>2.13.3</log4j2.version>
<!-- Flink version -->
- <flink.version>1.10.0</flink.version>
+ <flink.version>1.11.2</flink.version>
<junit.jupiter.version>5.4.1</junit.jupiter.version>
@@ -784,4 +785,3 @@
</profiles>
</project>
-