You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2021/12/26 17:27:16 UTC
[bahir-flink] branch master updated: [BAHIR-291] Bump flink to 1.14.0 (#136)
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 245ac2c [BAHIR-291] Bump flink to 1.14.0 (#136)
245ac2c is described below
commit 245ac2c0ce4d4bd3c6767991af3406302b173ce5
Author: Roc Marshal <fl...@126.com>
AuthorDate: Mon Dec 27 01:27:08 2021 +0800
[BAHIR-291] Bump flink to 1.14.0 (#136)
---
.github/workflows/maven-ci.yml | 2 +-
.travis.yml | 4 +-
flink-connector-activemq/pom.xml | 2 +-
.../connectors/activemq/AMQSourceTest.java | 6 ++
.../connectors/influxdb/common/DataPoint.java | 4 +-
.../source/enumerator/InfluxDBSplitEnumerator.java | 2 +-
flink-connector-kudu/pom.xml | 2 +-
.../flink/connectors/kudu/table/KuduCatalog.java | 4 +-
.../connectors/kudu/table/KuduCatalogFactory.java | 7 +-
.../connectors/kudu/table/KuduTableFactory.java | 2 +-
.../connectors/kudu/table/KuduTableSource.java | 2 +-
.../kudu/table/utils/KuduTableUtils.java | 2 +-
flink-connector-netty/pom.xml | 2 +-
flink-connector-pinot/pom.xml | 6 +-
.../connectors/redis/RedisTableSinkFactory.java | 3 +-
.../connectors/redis/descriptor/Redis.java | 35 +++---
.../connectors/redis/RedisDescriptorTest.java | 31 +++---
.../redis/common/RedisSinkZIncrByTest.java | 120 ++++++++++-----------
flink-library-siddhi/pom.xml | 2 +-
.../siddhi/operator/AbstractSiddhiOperator.java | 4 +-
pom.xml | 2 +-
21 files changed, 121 insertions(+), 123 deletions(-)
diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml
index f1b1ed8..e16453e 100644
--- a/.github/workflows/maven-ci.yml
+++ b/.github/workflows/maven-ci.yml
@@ -30,7 +30,7 @@ jobs:
strategy:
matrix:
java: ['8', '11']
- flink-version: ['1.12.2']
+ flink-version: ['1.14.0']
scala-version: ['2.11', '2.12']
steps:
diff --git a/.travis.yml b/.travis.yml
index b6793dc..403ce86 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -34,8 +34,8 @@ jdk:
- openjdk8
env:
- - FLINK_VERSION="1.12.2" SCALA_VERSION="2.11"
- - FLINK_VERSION="1.12.2" SCALA_VERSION="2.12"
+ - FLINK_VERSION="1.14.0" SCALA_VERSION="2.11"
+ - FLINK_VERSION="1.14.0" SCALA_VERSION="2.12"
before_install:
- ./dev/change-scala-version.sh $SCALA_VERSION
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
index a19a620..cef4db0 100644
--- a/flink-connector-activemq/pom.xml
+++ b/flink-connector-activemq/pom.xml
@@ -78,7 +78,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
index b4f71da..afe714a 100644
--- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
@@ -37,6 +37,7 @@ import scala.Array;
import javax.jms.*;
import java.util.Collections;
+import java.util.OptionalLong;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -98,6 +99,11 @@ public class AMQSourceTest {
}
@Override
+ public OptionalLong getRestoredCheckpointId() {
+ return mock(OptionalLong.class);
+ }
+
+ @Override
public OperatorStateStore getOperatorStateStore() {
return mock(OperatorStateStore.class);
}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
index 3539881..5e3f86a 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
@@ -44,8 +44,8 @@ import javax.annotation.Nullable;
public final class DataPoint {
private final String measurement;
- private final Map<String, String> tags = new HashMap();
- private final Map<String, Object> fields = new HashMap();
+ private final Map<String, String> tags = new HashMap<>();
+ private final Map<String, Object> fields = new HashMap<>();
private final Long timestamp;
DataPoint(final String measurementName, @Nullable final Long timestamp) {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
index b7a2c11..9ac9b23 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
@@ -56,7 +56,7 @@ public final class InfluxDBSplitEnumerator
}
@Override
- public InfluxDBSourceEnumState snapshotState() {
+ public InfluxDBSourceEnumState snapshotState(long l) throws Exception {
return new InfluxDBSourceEnumState();
}
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index a76102e..ac6cdc5 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -51,7 +51,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
index 2ca7c0e..d8343e8 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -42,7 +42,7 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.AlterTableOptions;
@@ -237,7 +237,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException {
- Map<String, String> tableProperties = table.getProperties();
+ Map<String, String> tableProperties = table.getOptions();
TableSchema tableSchema = table.getSchema();
Set<String> optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS));
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
index 30aaa40..2458a56 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
@@ -31,9 +31,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-
/**
* Factory for {@link KuduCatalog}.
*/
@@ -45,8 +42,8 @@ public class KuduCatalogFactory implements CatalogFactory {
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
- context.put(CATALOG_TYPE, KuduTableFactory.KUDU);
- context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
+ context.put("type", KuduTableFactory.KUDU);
+ context.put("property-version", "1"); // backwards compatibility
return context;
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 524f521..a2883af 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -132,7 +132,7 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
public KuduTableSource createTableSource(ObjectPath tablePath, CatalogTable table) {
validateTable(table);
String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
- return createTableSource(tableName, table.getSchema(), table.getProperties());
+ return createTableSource(tableName, table.getSchema(), table.getOptions());
}
private KuduTableSource createTableSource(String tableName, TableSchema schema, Map<String, String> props) {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
index db73df3..fea7e73 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -37,7 +37,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
index 53f205d..aa9b34e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.utils.TableSchemaUtils;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
diff --git a/flink-connector-netty/pom.xml b/flink-connector-netty/pom.xml
index a93a067..eaff1dd 100644
--- a/flink-connector-netty/pom.xml
+++ b/flink-connector-netty/pom.xml
@@ -73,7 +73,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
diff --git a/flink-connector-pinot/pom.xml b/flink-connector-pinot/pom.xml
index 74eacd5..85846bf 100644
--- a/flink-connector-pinot/pom.xml
+++ b/flink-connector-pinot/pom.xml
@@ -62,6 +62,10 @@ under the License.
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -114,7 +118,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
index 16dfdbc..a90ffa7 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
@@ -30,7 +30,6 @@ import java.util.Map;
import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.*;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
import static org.apache.flink.table.descriptors.Schema.*;
/**
@@ -65,7 +64,7 @@ public class RedisTableSinkFactory implements StreamTableSinkFactory<Tuple2<Bool
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
properties.add(SCHEMA + ".#." + SCHEMA_FROM);
// format wildcard
- properties.add(FORMAT + ".*");
+ properties.add("format.*");
properties.add(CONNECTOR + ".*");
return properties;
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
index 86fcd29..2da3dde 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
@@ -17,31 +17,32 @@
package org.apache.flink.streaming.connectors.redis.descriptor;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.util.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-
import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.*;
/**
* redis descriptor for create redis connector.
*/
-public class Redis extends ConnectorDescriptor {
+public class Redis extends ConnectorDescriptorValidator {
- Map<String, String> properties = new HashMap<>();
+ private final DescriptorProperties properties;
private String mode = null;
private String redisCommand = null;
private Integer ttl;
- public Redis(String type, int version, boolean formatNeeded) {
- super(REDIS, version, formatNeeded);
+ public Redis(String type, int version) {
+ super();
+ properties = new DescriptorProperties();
+ properties.putString("connector.type", type);
+ properties.putInt("connector.property-version", version);
}
public Redis() {
- this(REDIS, 1, false);
+ this(REDIS, 1);
}
/**
@@ -51,18 +52,18 @@ public class Redis extends ConnectorDescriptor {
*/
public Redis command(String redisCommand) {
this.redisCommand = redisCommand;
- properties.put(REDIS_COMMAND, redisCommand);
+ properties.putString(REDIS_COMMAND, redisCommand);
return this;
}
/**
* ttl for specified key.
* @param ttl time for key.
- * @returnthis descriptor
+ * @return this descriptor
*/
public Redis ttl(Integer ttl) {
this.ttl = ttl;
- properties.put(REDIS_KEY_TTL, String.valueOf(ttl));
+ properties.putInt(REDIS_KEY_TTL, ttl);
return this;
}
@@ -73,7 +74,7 @@ public class Redis extends ConnectorDescriptor {
*/
public Redis mode(String mode) {
this.mode = mode;
- properties.put(REDIS_MODE, mode);
+ properties.putString(REDIS_MODE, mode);
return this;
}
@@ -84,16 +85,10 @@ public class Redis extends ConnectorDescriptor {
* @return this descriptor
*/
public Redis property(String k, String v) {
- properties.put(k, v);
+ properties.putString(k, v);
return this;
}
- @Override
- protected Map<String, String> toConnectorProperties() {
- validate();
- return properties;
- }
-
/**
* validate the necessary properties for redis descriptor.
*/
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
index 3b48ea2..abb1b2e 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -22,18 +22,13 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.descriptor.Redis;
-import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import org.junit.Before;
import org.junit.Test;
-public class RedisDescriptorTest extends RedisITCaseBase{
+public class RedisDescriptorTest extends RedisITCaseBase {
private static final String REDIS_KEY = "TEST_KEY";
@@ -51,23 +46,26 @@ public class RedisDescriptorTest extends RedisITCaseBase{
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
tableEnvironment.registerDataStream("t1", source, "k, v");
- Redis redis = new Redis()
+ /*Redis redis = new Redis()
.mode(RedisValidator.REDIS_CLUSTER)
.command(RedisCommand.INCRBY_EX.name())
.ttl(100000)
- .property(RedisValidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT);
-
- tableEnvironment
- .connect(redis).withSchema(new Schema()
- .field("k", TypeInformation.of(String.class))
- .field("v", TypeInformation.of(Long.class)))
- .createTemporaryTable("redis");
+ .property(RedisValidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT);*/
+
+ tableEnvironment.executeSql("create table redis " +
+ "(k string, " +
+ "v bigint) " +
+ "with (" +
+ "'connector.type'='redis'," +
+ "'redis-mode'='cluster'," +
+ "'cluster-nodes'='"+String.format("%s:%s",REDIS_HOST, REDIS_PORT)+"'," +
+ "'command'='INCRBY_EX'," +
+ "'key.ttl'='100000')");
tableEnvironment.executeSql("insert into redis select k, v from t1");
}
@@ -79,7 +77,6 @@ public class RedisDescriptorTest extends RedisITCaseBase{
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisSinkZIncrByTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisSinkZIncrByTest.java
index 1b6eebc..132451b 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisSinkZIncrByTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisSinkZIncrByTest.java
@@ -37,85 +37,85 @@ import java.util.Optional;
public class RedisSinkZIncrByTest extends RedisITCaseBase {
- private static final String REDIS_CLUSTER_HOSTS = "redis-01:7000,redis-02:7000,redis-03:7000";
-
- private static final HashSet<InetSocketAddress> NODES = new HashSet<InetSocketAddress>();
-
- @Before
- public void before() throws Exception {
- String[] hostList = REDIS_CLUSTER_HOSTS.split(",", -1);
- for (String host : hostList) {
- String[] parts = host.split(":", 2);
- if (parts.length > 1) {
- NODES.add(InetSocketAddress.createUnresolved(parts[0], Integer.valueOf(parts[1])));
- } else {
- throw new MalformedURLException("invalid redis hosts format");
- }
- }
+ private static final String REDIS_CLUSTER_HOSTS = "redis-01:7000,redis-02:7000,redis-03:7000";
+
+ private static final HashSet<InetSocketAddress> NODES = new HashSet<InetSocketAddress>();
+
+ @Before
+ public void before() throws Exception {
+ String[] hostList = REDIS_CLUSTER_HOSTS.split(",", -1);
+ for (String host : hostList) {
+ String[] parts = host.split(":", 2);
+ if (parts.length > 1) {
+ NODES.add(InetSocketAddress.createUnresolved(parts[0], Integer.valueOf(parts[1])));
+ } else {
+ throw new MalformedURLException("invalid redis hosts format");
+ }
}
+ }
- @Test
- public void redisSinkTest() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ @Test
+ public void redisSinkTest() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- FlinkJedisClusterConfig jedisClusterConfig = new FlinkJedisClusterConfig.Builder()
- .setNodes(NODES).build();
- DataStreamSource<Tuple2<String, Integer>> source = env.addSource(new TestSourceFunction());
+ FlinkJedisClusterConfig jedisClusterConfig = new FlinkJedisClusterConfig.Builder()
+ .setNodes(NODES).build();
+ DataStreamSource<Tuple2<String, Integer>> source = env.addSource(new TestSourceFunction());
- RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(jedisClusterConfig, new RedisTestMapper());
+ RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(jedisClusterConfig, new RedisTestMapper());
- source.addSink(redisSink);
+ source.addSink(redisSink);
- env.execute("Redis Sink Test");
- }
+ env.execute("Redis Sink Test");
+ }
- @After
- public void after() throws Exception {
+ @After
+ public void after() throws Exception {
- }
+ }
- private static class TestSourceFunction implements SourceFunction<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
+ private static class TestSourceFunction implements SourceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
- private volatile boolean running = true;
+ private volatile boolean running = true;
- @Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
- for (int i = 0; i < 10 && running; i++) {
- ctx.collect(new Tuple2<>("test_" + i, i));
- }
- }
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ for (int i = 0; i < 10 && running; i++) {
+ ctx.collect(new Tuple2<>("test_" + i, i));
+ }
+ }
- @Override
- public void cancel() {
- running = false;
- }
+ @Override
+ public void cancel() {
+ running = false;
}
+ }
- private static class RedisTestMapper implements RedisMapper<Tuple2<String, Integer>> {
- private static final String ZINCRBY_NAME_PREFIX = "RANKING";
+ private static class RedisTestMapper implements RedisMapper<Tuple2<String, Integer>> {
+ private static final String ZINCRBY_NAME_PREFIX = "RANKING";
- @Override
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(RedisCommand.ZINCRBY, ZINCRBY_NAME_PREFIX);
- }
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(RedisCommand.ZINCRBY, ZINCRBY_NAME_PREFIX);
+ }
- @Override
- public String getKeyFromData(Tuple2<String, Integer> data) {
- return data.f0;
- }
+ @Override
+ public String getKeyFromData(Tuple2<String, Integer> data) {
+ return data.f0;
+ }
- @Override
- public String getValueFromData(Tuple2<String, Integer> data) {
- return data.f1.toString();
- }
+ @Override
+ public String getValueFromData(Tuple2<String, Integer> data) {
+ return data.f1.toString();
+ }
- @Override
- public Optional<String> getAdditionalKey(Tuple2<String, Integer> data) {
- String key = ZINCRBY_NAME_PREFIX + ":" + "TEST";
- return Optional.of(key);
- }
+ @Override
+ public Optional<String> getAdditionalKey(Tuple2<String, Integer> data) {
+ String key = ZINCRBY_NAME_PREFIX + ":" + "TEST";
+ return Optional.of(key);
}
+ }
}
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
index 44e1ec2..10e6d14 100644
--- a/flink-library-siddhi/pom.xml
+++ b/flink-library-siddhi/pom.xml
@@ -72,7 +72,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
index b796a88..017831e 100755
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -271,10 +271,10 @@ public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOper
}
@Override
- public void dispose() throws Exception {
+ public void close() throws Exception {
shutdownSiddhiRuntime();
this.siddhiRuntimeState.clear();
- super.dispose();
+ super.close();
}
@Override
diff --git a/pom.xml b/pom.xml
index 26a8d98..02447ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@
<log4j2.version>2.13.3</log4j2.version>
<!-- Flink version -->
- <flink.version>1.12.2</flink.version>
+ <flink.version>1.14.0</flink.version>
<junit.jupiter.version>5.4.1</junit.jupiter.version>