You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2021/12/26 17:27:16 UTC

[bahir-flink] branch master updated: [BAHIR-291] Bump flink to 1.14.0 (#136)

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 245ac2c  [BAHIR-291] Bump flink to 1.14.0 (#136)
245ac2c is described below

commit 245ac2c0ce4d4bd3c6767991af3406302b173ce5
Author: Roc Marshal <fl...@126.com>
AuthorDate: Mon Dec 27 01:27:08 2021 +0800

    [BAHIR-291] Bump flink to 1.14.0 (#136)
---
 .github/workflows/maven-ci.yml                     |   2 +-
 .travis.yml                                        |   4 +-
 flink-connector-activemq/pom.xml                   |   2 +-
 .../connectors/activemq/AMQSourceTest.java         |   6 ++
 .../connectors/influxdb/common/DataPoint.java      |   4 +-
 .../source/enumerator/InfluxDBSplitEnumerator.java |   2 +-
 flink-connector-kudu/pom.xml                       |   2 +-
 .../flink/connectors/kudu/table/KuduCatalog.java   |   4 +-
 .../connectors/kudu/table/KuduCatalogFactory.java  |   7 +-
 .../connectors/kudu/table/KuduTableFactory.java    |   2 +-
 .../connectors/kudu/table/KuduTableSource.java     |   2 +-
 .../kudu/table/utils/KuduTableUtils.java           |   2 +-
 flink-connector-netty/pom.xml                      |   2 +-
 flink-connector-pinot/pom.xml                      |   6 +-
 .../connectors/redis/RedisTableSinkFactory.java    |   3 +-
 .../connectors/redis/descriptor/Redis.java         |  35 +++---
 .../connectors/redis/RedisDescriptorTest.java      |  31 +++---
 .../redis/common/RedisSinkZIncrByTest.java         | 120 ++++++++++-----------
 flink-library-siddhi/pom.xml                       |   2 +-
 .../siddhi/operator/AbstractSiddhiOperator.java    |   4 +-
 pom.xml                                            |   2 +-
 21 files changed, 121 insertions(+), 123 deletions(-)

diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml
index f1b1ed8..e16453e 100644
--- a/.github/workflows/maven-ci.yml
+++ b/.github/workflows/maven-ci.yml
@@ -30,7 +30,7 @@ jobs:
     strategy:
       matrix:
        java: ['8', '11']
-       flink-version: ['1.12.2']
+       flink-version: ['1.14.0']
        scala-version: ['2.11', '2.12']
 
     steps:
diff --git a/.travis.yml b/.travis.yml
index b6793dc..403ce86 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -34,8 +34,8 @@ jdk:
   - openjdk8
 
 env:
-  - FLINK_VERSION="1.12.2" SCALA_VERSION="2.11"
-  - FLINK_VERSION="1.12.2" SCALA_VERSION="2.12"
+  - FLINK_VERSION="1.14.0" SCALA_VERSION="2.11"
+  - FLINK_VERSION="1.14.0" SCALA_VERSION="2.12"
 
 before_install:
   - ./dev/change-scala-version.sh $SCALA_VERSION
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
index a19a620..cef4db0 100644
--- a/flink-connector-activemq/pom.xml
+++ b/flink-connector-activemq/pom.xml
@@ -78,7 +78,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <artifactId>flink-runtime</artifactId>
             <version>${flink.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
index b4f71da..afe714a 100644
--- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
@@ -37,6 +37,7 @@ import scala.Array;
 
 import javax.jms.*;
 import java.util.Collections;
+import java.util.OptionalLong;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -98,6 +99,11 @@ public class AMQSourceTest {
             }
 
             @Override
+            public OptionalLong getRestoredCheckpointId() {
+                return mock(OptionalLong.class);
+            }
+
+            @Override
             public OperatorStateStore getOperatorStateStore() {
                 return mock(OperatorStateStore.class);
             }
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
index 3539881..5e3f86a 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
@@ -44,8 +44,8 @@ import javax.annotation.Nullable;
 public final class DataPoint {
 
     private final String measurement;
-    private final Map<String, String> tags = new HashMap();
-    private final Map<String, Object> fields = new HashMap();
+    private final Map<String, String> tags = new HashMap<>();
+    private final Map<String, Object> fields = new HashMap<>();
     private final Long timestamp;
 
     DataPoint(final String measurementName, @Nullable final Long timestamp) {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
index b7a2c11..9ac9b23 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
@@ -56,7 +56,7 @@ public final class InfluxDBSplitEnumerator
     }
 
     @Override
-    public InfluxDBSourceEnumState snapshotState() {
+    public InfluxDBSourceEnumState snapshotState(long l) throws Exception {
         return new InfluxDBSourceEnumState();
     }
 
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index a76102e..ac6cdc5 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -51,7 +51,7 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
index 2ca7c0e..d8343e8 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -42,7 +42,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.StringUtils;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.AlterTableOptions;
@@ -237,7 +237,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
 
     @Override
     public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException {
-        Map<String, String> tableProperties = table.getProperties();
+        Map<String, String> tableProperties = table.getOptions();
         TableSchema tableSchema = table.getSchema();
 
         Set<String> optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS));
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
index 30aaa40..2458a56 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
@@ -31,9 +31,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-
 /**
  * Factory for {@link KuduCatalog}.
  */
@@ -45,8 +42,8 @@ public class KuduCatalogFactory implements CatalogFactory {
     @Override
     public Map<String, String> requiredContext() {
         Map<String, String> context = new HashMap<>();
-        context.put(CATALOG_TYPE, KuduTableFactory.KUDU);
-        context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
+        context.put("type", KuduTableFactory.KUDU);
+        context.put("property-version", "1"); // backwards compatibility
         return context;
     }
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 524f521..a2883af 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -132,7 +132,7 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
     public KuduTableSource createTableSource(ObjectPath tablePath, CatalogTable table) {
         validateTable(table);
         String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
-        return createTableSource(tableName, table.getSchema(), table.getProperties());
+        return createTableSource(tableName, table.getSchema(), table.getOptions());
     }
 
     private KuduTableSource createTableSource(String tableName, TableSchema schema, Map<String, String> props) {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
index db73df3..fea7e73 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -37,7 +37,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
index 53f205d..aa9b34e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
diff --git a/flink-connector-netty/pom.xml b/flink-connector-netty/pom.xml
index a93a067..eaff1dd 100644
--- a/flink-connector-netty/pom.xml
+++ b/flink-connector-netty/pom.xml
@@ -73,7 +73,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>test</scope>
     </dependency>
diff --git a/flink-connector-pinot/pom.xml b/flink-connector-pinot/pom.xml
index 74eacd5..85846bf 100644
--- a/flink-connector-pinot/pom.xml
+++ b/flink-connector-pinot/pom.xml
@@ -62,6 +62,10 @@ under the License.
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-math3</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -114,7 +118,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <artifactId>flink-runtime</artifactId>
             <version>${flink.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
index 16dfdbc..a90ffa7 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.*;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
 import static org.apache.flink.table.descriptors.Schema.*;
 
 /**
@@ -65,7 +64,7 @@ public class RedisTableSinkFactory implements StreamTableSinkFactory<Tuple2<Bool
         properties.add(SCHEMA + ".#." + SCHEMA_NAME);
         properties.add(SCHEMA + ".#." + SCHEMA_FROM);
         // format wildcard
-        properties.add(FORMAT + ".*");
+        properties.add("format.*");
         properties.add(CONNECTOR + ".*");
         return properties;
     }
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
index 86fcd29..2da3dde 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
@@ -17,31 +17,32 @@
 
 package org.apache.flink.streaming.connectors.redis.descriptor;
 
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.util.Preconditions;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.*;
 
 /**
  * redis descriptor for create redis connector.
  */
-public class Redis extends ConnectorDescriptor {
+public class Redis extends ConnectorDescriptorValidator {
 
-    Map<String, String> properties = new HashMap<>();
+    private final DescriptorProperties properties;
 
     private String mode = null;
     private String redisCommand = null;
     private Integer ttl;
 
-    public Redis(String type, int version, boolean formatNeeded) {
-        super(REDIS, version, formatNeeded);
+    public Redis(String type, int version) {
+        super();
+        properties = new DescriptorProperties();
+        properties.putString("connector.type", type);
+        properties.putInt("connector.property-version", version);
     }
 
     public Redis() {
-        this(REDIS, 1, false);
+        this(REDIS, 1);
     }
 
     /**
@@ -51,18 +52,18 @@ public class Redis extends ConnectorDescriptor {
      */
     public Redis command(String redisCommand) {
         this.redisCommand = redisCommand;
-            properties.put(REDIS_COMMAND, redisCommand);
+        properties.putString(REDIS_COMMAND, redisCommand);
         return this;
     }
 
     /**
      * ttl for specified key.
      * @param ttl time for key.
-     * @returnthis descriptor
+     * @return this descriptor
      */
     public Redis ttl(Integer ttl) {
         this.ttl = ttl;
-        properties.put(REDIS_KEY_TTL, String.valueOf(ttl));
+        properties.putInt(REDIS_KEY_TTL, ttl);
         return this;
     }
 
@@ -73,7 +74,7 @@ public class Redis extends ConnectorDescriptor {
      */
     public Redis mode(String mode) {
         this.mode = mode;
-        properties.put(REDIS_MODE, mode);
+        properties.putString(REDIS_MODE, mode);
         return this;
     }
 
@@ -84,16 +85,10 @@ public class Redis extends ConnectorDescriptor {
      * @return this descriptor
      */
     public Redis property(String k, String v) {
-        properties.put(k, v);
+        properties.putString(k, v);
         return this;
     }
 
-    @Override
-    protected Map<String, String> toConnectorProperties() {
-        validate();
-        return properties;
-    }
-
     /**
      * validate the necessary properties for redis descriptor.
      */
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
index 3b48ea2..abb1b2e 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -22,18 +22,13 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.descriptor.Redis;
-import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.*;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.types.Row;
 import org.junit.Before;
 import org.junit.Test;
 
-public class RedisDescriptorTest extends  RedisITCaseBase{
+public class RedisDescriptorTest extends  RedisITCaseBase {
 
     private static final String REDIS_KEY = "TEST_KEY";
 
@@ -51,23 +46,26 @@ public class RedisDescriptorTest extends  RedisITCaseBase{
 
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useOldPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
         tableEnvironment.registerDataStream("t1", source, "k, v");
 
-        Redis redis = new Redis()
+        /*Redis redis = new Redis()
                 .mode(RedisValidator.REDIS_CLUSTER)
                 .command(RedisCommand.INCRBY_EX.name())
                 .ttl(100000)
-                .property(RedisValidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT);
-
-        tableEnvironment
-                .connect(redis).withSchema(new Schema()
-                .field("k", TypeInformation.of(String.class))
-                .field("v", TypeInformation.of(Long.class)))
-                .createTemporaryTable("redis");
+                .property(RedisValidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT);*/
+
+        tableEnvironment.executeSql("create table redis " +
+                        "(k string, " +
+                        "v bigint) " +
+                        "with (" +
+                        "'connector.type'='redis'," +
+                        "'redis-mode'='cluster'," +
+                        "'cluster-nodes'='"+String.format("%s:%s",REDIS_HOST, REDIS_PORT)+"'," +
+                        "'command'='INCRBY_EX'," +
+                        "'key.ttl'='100000')");
 
         tableEnvironment.executeSql("insert into redis select k, v from t1");
     }
@@ -79,7 +77,6 @@ public class RedisDescriptorTest extends  RedisITCaseBase{
 
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useOldPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisSinkZIncrByTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisSinkZIncrByTest.java
index 1b6eebc..132451b 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisSinkZIncrByTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisSinkZIncrByTest.java
@@ -37,85 +37,85 @@ import java.util.Optional;
 
 public class RedisSinkZIncrByTest extends RedisITCaseBase {
 
-    private static final String REDIS_CLUSTER_HOSTS = "redis-01:7000,redis-02:7000,redis-03:7000";
-
-    private static final HashSet<InetSocketAddress> NODES = new HashSet<InetSocketAddress>();
-
-    @Before
-    public void before() throws Exception {
-        String[] hostList = REDIS_CLUSTER_HOSTS.split(",", -1);
-        for (String host : hostList) {
-            String[] parts = host.split(":", 2);
-            if (parts.length > 1) {
-                NODES.add(InetSocketAddress.createUnresolved(parts[0], Integer.valueOf(parts[1])));
-            } else {
-                throw new MalformedURLException("invalid redis hosts format");
-            }
-        }
+  private static final String REDIS_CLUSTER_HOSTS = "redis-01:7000,redis-02:7000,redis-03:7000";
+
+  private static final HashSet<InetSocketAddress> NODES = new HashSet<InetSocketAddress>();
+
+  @Before
+  public void before() throws Exception {
+    String[] hostList = REDIS_CLUSTER_HOSTS.split(",", -1);
+    for (String host : hostList) {
+      String[] parts = host.split(":", 2);
+      if (parts.length > 1) {
+        NODES.add(InetSocketAddress.createUnresolved(parts[0], Integer.valueOf(parts[1])));
+      } else {
+        throw new MalformedURLException("invalid redis hosts format");
+      }
     }
+  }
 
-    @Test
-    public void redisSinkTest() throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  @Test
+  public void redisSinkTest() throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-        FlinkJedisClusterConfig jedisClusterConfig = new FlinkJedisClusterConfig.Builder()
-                .setNodes(NODES).build();
-        DataStreamSource<Tuple2<String, Integer>> source = env.addSource(new TestSourceFunction());
+    FlinkJedisClusterConfig jedisClusterConfig = new FlinkJedisClusterConfig.Builder()
+        .setNodes(NODES).build();
+    DataStreamSource<Tuple2<String, Integer>> source = env.addSource(new TestSourceFunction());
 
-        RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(jedisClusterConfig, new RedisTestMapper());
+    RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(jedisClusterConfig, new RedisTestMapper());
 
-        source.addSink(redisSink);
+    source.addSink(redisSink);
 
-        env.execute("Redis Sink Test");
-    }
+    env.execute("Redis Sink Test");
+  }
 
-    @After
-    public void after() throws Exception {
+  @After
+  public void after() throws Exception {
 
-    }
+  }
 
 
-    private static class TestSourceFunction implements SourceFunction<Tuple2<String, Integer>> {
-        private static final long serialVersionUID = 1L;
+  private static class TestSourceFunction implements SourceFunction<Tuple2<String, Integer>> {
+    private static final long serialVersionUID = 1L;
 
-        private volatile boolean running = true;
+    private volatile boolean running = true;
 
-        @Override
-        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
-            for (int i = 0; i < 10 && running; i++) {
-                ctx.collect(new Tuple2<>("test_" + i, i));
-            }
-        }
+    @Override
+    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+      for (int i = 0; i < 10 && running; i++) {
+        ctx.collect(new Tuple2<>("test_" + i, i));
+      }
+    }
 
-        @Override
-        public void cancel() {
-            running = false;
-        }
+    @Override
+    public void cancel() {
+      running = false;
     }
+  }
 
 
-    private static class RedisTestMapper implements RedisMapper<Tuple2<String, Integer>> {
-        private static final String ZINCRBY_NAME_PREFIX = "RANKING";
+  private static class RedisTestMapper implements RedisMapper<Tuple2<String, Integer>> {
+    private static final String ZINCRBY_NAME_PREFIX = "RANKING";
 
-        @Override
-        public RedisCommandDescription getCommandDescription() {
-            return new RedisCommandDescription(RedisCommand.ZINCRBY, ZINCRBY_NAME_PREFIX);
-        }
+    @Override
+    public RedisCommandDescription getCommandDescription() {
+      return new RedisCommandDescription(RedisCommand.ZINCRBY, ZINCRBY_NAME_PREFIX);
+    }
 
-        @Override
-        public String getKeyFromData(Tuple2<String, Integer> data) {
-            return data.f0;
-        }
+    @Override
+    public String getKeyFromData(Tuple2<String, Integer> data) {
+      return data.f0;
+    }
 
-        @Override
-        public String getValueFromData(Tuple2<String, Integer> data) {
-            return data.f1.toString();
-        }
+    @Override
+    public String getValueFromData(Tuple2<String, Integer> data) {
+      return data.f1.toString();
+    }
 
-        @Override
-        public Optional<String> getAdditionalKey(Tuple2<String, Integer> data) {
-            String key = ZINCRBY_NAME_PREFIX + ":" + "TEST";
-            return Optional.of(key);
-        }
+    @Override
+    public Optional<String> getAdditionalKey(Tuple2<String, Integer> data) {
+      String key = ZINCRBY_NAME_PREFIX + ":" + "TEST";
+      return Optional.of(key);
     }
+  }
 }
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
index 44e1ec2..10e6d14 100644
--- a/flink-library-siddhi/pom.xml
+++ b/flink-library-siddhi/pom.xml
@@ -72,7 +72,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <artifactId>flink-runtime</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
index b796a88..017831e 100755
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -271,10 +271,10 @@ public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOper
     }
 
     @Override
-    public void dispose() throws Exception {
+    public void close() throws Exception {
         shutdownSiddhiRuntime();
         this.siddhiRuntimeState.clear();
-        super.dispose();
+        super.close();
     }
 
     @Override
diff --git a/pom.xml b/pom.xml
index 26a8d98..02447ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@
     <log4j2.version>2.13.3</log4j2.version>
 
     <!-- Flink version -->
-    <flink.version>1.12.2</flink.version>
+    <flink.version>1.14.0</flink.version>
 
     <junit.jupiter.version>5.4.1</junit.jupiter.version>