You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/03/04 15:47:07 UTC

bahir-flink git commit: [BAHIR-91] Upgrade Flink version to 1.2.0

Repository: bahir-flink
Updated Branches:
  refs/heads/master 9f306889f -> 3f180342c


[BAHIR-91] Upgrade Flink version to 1.2.0

This closes #11
This closes #9 (Closing PR due to inactivity)


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/3f180342
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/3f180342
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/3f180342

Branch: refs/heads/master
Commit: 3f180342ca0efaf48ffa4d1f7b6dbfcb4cd892b6
Parents: 9f30688
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Mar 1 14:41:21 2017 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Sat Mar 4 16:46:50 2017 +0100

----------------------------------------------------------------------
 .travis.yml                                     |  5 ++---
 flink-connector-activemq/pom.xml                |  1 +
 .../connectors/activemq/AMQSourceTest.java      | 19 ++++++++++++++++++
 .../activemq/ActiveMQConnectorITCase.java       | 21 +++++++++++++++-----
 flink-connector-redis/pom.xml                   | 11 ++++++++++
 pom.xml                                         |  2 +-
 6 files changed, 50 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index fd3733a..53f31a5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,8 +18,7 @@
 language: java
 
 env:
-#  - FLINK_VERSION="1.1.0"
-  - FLINK_VERSION="1.1.1"
+  - FLINK_VERSION="1.2.0"
 
 jdk:
   - oraclejdk8
@@ -28,4 +27,4 @@ jdk:
 
 install: true
 
-script: mvn clean verify -Dflink.version=$FLINK_VERSION
\ No newline at end of file
+script: mvn clean verify -Dflink.version=$FLINK_VERSION

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
index d76ca89..b947f8e 100644
--- a/flink-connector-activemq/pom.xml
+++ b/flink-connector-activemq/pom.xml
@@ -85,6 +85,7 @@ under the License.
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.activemq.tooling</groupId>
             <artifactId>activemq-junit</artifactId>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
index 05d0d60..2e6efa6 100644
--- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
@@ -19,7 +19,10 @@ package org.apache.flink.streaming.connectors.activemq;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
@@ -97,6 +100,22 @@ public class AMQSourceTest {
         amqSource = new AMQSource<>(config);
         amqSource.setRuntimeContext(createRuntimeContext());
         amqSource.open(new Configuration());
+        amqSource.initializeState(new FunctionInitializationContext() {
+            @Override
+            public boolean isRestored() {
+                return false;
+            }
+
+            @Override
+            public OperatorStateStore getOperatorStateStore() {
+                return mock(OperatorStateStore.class);
+            }
+
+            @Override
+            public KeyedStateStore getKeyedStateStore() {
+                return mock(KeyedStateStore.class);
+            }
+        });
     }
 
     private RuntimeContext createRuntimeContext() {

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
index 985e06d..24a257f 100644
--- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -29,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -51,7 +52,7 @@ public class ActiveMQConnectorITCase {
     public static final int MESSAGES_NUM = 10000;
     public static final String QUEUE_NAME = "queue";
     public static final String TOPIC_NAME = "topic";
-    private static ForkableFlinkMiniCluster flink;
+    private static LocalFlinkMiniCluster flink;
     private static int flinkPort;
 
     @BeforeClass
@@ -63,7 +64,7 @@ public class ActiveMQConnectorITCase {
         flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
         flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
-        flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+        flink = new LocalFlinkMiniCluster(flinkConfig, false);
         flink.start();
 
         flinkPort = flink.getLeaderRPCPort();
@@ -211,9 +212,19 @@ public class ActiveMQConnectorITCase {
         while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) {
             Thread.sleep(100);
             Random random = new Random();
-            long checkpointId = random.nextLong();
+            final long checkpointId = random.nextLong();
             synchronized (sourceContext.getCheckpointLock()) {
-                source.snapshotState(checkpointId, System.currentTimeMillis());
+                source.snapshotState(new FunctionSnapshotContext() {
+                    @Override
+                    public long getCheckpointId() {
+                        return checkpointId;
+                    }
+
+                    @Override
+                    public long getCheckpointTimestamp() {
+                        return System.currentTimeMillis();
+                    }
+                });
                 source.notifyCheckpointComplete(checkpointId);
             }
         }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 7920ca5..2412822 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -74,4 +74,15 @@ under the License.
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01818b0..b8458cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,7 +90,7 @@
     <log4j.version>1.2.17</log4j.version>
 
     <!-- Flink version -->
-    <flink.version>1.1.1</flink.version>
+    <flink.version>1.2.0</flink.version>
 
     <PermGen>64m</PermGen>
     <MaxPermGen>512m</MaxPermGen>