You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/03/04 15:47:07 UTC
bahir-flink git commit: [BAHIR-91] Upgrade Flink version to 1.2.0
Repository: bahir-flink
Updated Branches:
refs/heads/master 9f306889f -> 3f180342c
[BAHIR-91] Upgrade Flink version to 1.2.0
This closes #11
This closes #9 (Closing PR due to inactivity)
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/3f180342
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/3f180342
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/3f180342
Branch: refs/heads/master
Commit: 3f180342ca0efaf48ffa4d1f7b6dbfcb4cd892b6
Parents: 9f30688
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Mar 1 14:41:21 2017 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Sat Mar 4 16:46:50 2017 +0100
----------------------------------------------------------------------
.travis.yml | 5 ++---
flink-connector-activemq/pom.xml | 1 +
.../connectors/activemq/AMQSourceTest.java | 19 ++++++++++++++++++
.../activemq/ActiveMQConnectorITCase.java | 21 +++++++++++++++-----
flink-connector-redis/pom.xml | 11 ++++++++++
pom.xml | 2 +-
6 files changed, 50 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index fd3733a..53f31a5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,8 +18,7 @@
language: java
env:
-# - FLINK_VERSION="1.1.0"
- - FLINK_VERSION="1.1.1"
+ - FLINK_VERSION="1.2.0"
jdk:
- oraclejdk8
@@ -28,4 +27,4 @@ jdk:
install: true
-script: mvn clean verify -Dflink.version=$FLINK_VERSION
\ No newline at end of file
+script: mvn clean verify -Dflink.version=$FLINK_VERSION
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
index d76ca89..b947f8e 100644
--- a/flink-connector-activemq/pom.xml
+++ b/flink-connector-activemq/pom.xml
@@ -85,6 +85,7 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
index 05d0d60..2e6efa6 100644
--- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
@@ -19,7 +19,10 @@ package org.apache.flink.streaming.connectors.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
@@ -97,6 +100,22 @@ public class AMQSourceTest {
amqSource = new AMQSource<>(config);
amqSource.setRuntimeContext(createRuntimeContext());
amqSource.open(new Configuration());
+ amqSource.initializeState(new FunctionInitializationContext() {
+ @Override
+ public boolean isRestored() {
+ return false;
+ }
+
+ @Override
+ public OperatorStateStore getOperatorStateStore() {
+ return mock(OperatorStateStore.class);
+ }
+
+ @Override
+ public KeyedStateStore getKeyedStateStore() {
+ return mock(KeyedStateStore.class);
+ }
+ });
}
private RuntimeContext createRuntimeContext() {
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
index 985e06d..24a257f 100644
--- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -29,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -51,7 +52,7 @@ public class ActiveMQConnectorITCase {
public static final int MESSAGES_NUM = 10000;
public static final String QUEUE_NAME = "queue";
public static final String TOPIC_NAME = "topic";
- private static ForkableFlinkMiniCluster flink;
+ private static LocalFlinkMiniCluster flink;
private static int flinkPort;
@BeforeClass
@@ -63,7 +64,7 @@ public class ActiveMQConnectorITCase {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
flinkPort = flink.getLeaderRPCPort();
@@ -211,9 +212,19 @@ public class ActiveMQConnectorITCase {
while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) {
Thread.sleep(100);
Random random = new Random();
- long checkpointId = random.nextLong();
+ final long checkpointId = random.nextLong();
synchronized (sourceContext.getCheckpointLock()) {
- source.snapshotState(checkpointId, System.currentTimeMillis());
+ source.snapshotState(new FunctionSnapshotContext() {
+ @Override
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ @Override
+ public long getCheckpointTimestamp() {
+ return System.currentTimeMillis();
+ }
+ });
source.notifyCheckpointComplete(checkpointId);
}
}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 7920ca5..2412822 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -74,4 +74,15 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01818b0..b8458cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,7 +90,7 @@
<log4j.version>1.2.17</log4j.version>
<!-- Flink version -->
- <flink.version>1.1.1</flink.version>
+ <flink.version>1.2.0</flink.version>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>