You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:41 UTC

[kylin] 28/30: [KYLIN-3921] Bump Flink version from 1.7.1 to 1.9.0

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9bdc440c4fb60f468dafdd32afdbd6906d1c2762
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Sep 4 19:03:10 2019 +0800

    [KYLIN-3921] Bump Flink version from 1.7.1 to 1.9.0
---
 build/script/download-flink.sh                     | 15 +++++++------
 engine-flink/pom.xml                               |  2 +-
 .../engine/flink/FlinkOnYarnConfigMapping.java     | 25 +++++++++++-----------
 .../engine/flink/FlinkOnYarnConfigMappingTest.java | 25 +++++++++++-----------
 4 files changed, 35 insertions(+), 32 deletions(-)

diff --git a/build/script/download-flink.sh b/build/script/download-flink.sh
index 1520408..1185904 100644
--- a/build/script/download-flink.sh
+++ b/build/script/download-flink.sh
@@ -27,36 +27,37 @@ if [[ `uname -a` =~ "Darwin" ]]; then
     alias md5cmd="md5 -q"
 fi
 
-flink_version="1.7.2"
+flink_version="1.9.0"
 scala_version="2.11"
 flink_pkg_md5="e0b5ce7f6352009c74b6c369f5872a5a"
 guava_dependency_version="14.0.1"
 jersey_version="1.9"
+hadoop_version="2.7.5"
 hbase_version="1.1.1"
 yammer_version="2.2.0"
 htrace_version="3.1.0-incubating"
 
-if [ ! -f "build/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz" ]; then
+if [ ! -f "build/flink-${flink_version}-bin-scala_${scala_version}.tgz" ]; then
     echo "no binary file found"
-    wget --directory-prefix=build/ http://archive.apache.org/dist/flink/flink-${flink_version}/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz || echo "Download flink failed"
+    wget --directory-prefix=build/ http://archive.apache.org/dist/flink/flink-${flink_version}/flink-${flink_version}-bin-scala_${scala_version}.tgz || echo "Download flink failed"
 else
     if [ `md5cmd build/ | awk '{print $1}'` != "${flink_pkg_md5}" ]; then
         echo "md5 check failed"
-        rm build/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz
-        wget --directory-prefix=build/ http://archive.apache.org/dist/flink/flink-${flink_version}/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz || echo "Download flink failed"
+        rm build/flink-${flink_version}-bin-scala_${scala_version}.tgz
+        wget --directory-prefix=build/ http://archive.apache.org/dist/flink/flink-${flink_version}/flink-${flink_version}-bin-scala_${scala_version}.tgz || echo "Download flink failed"
     fi
 fi
 unalias md5cmd
 
-tar -zxvf build/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz -C build/   || { exit 1; }
+tar -zxvf build/flink-${flink_version}-bin-scala_${scala_version}.tgz -C build/   || { exit 1; }
 mv build/flink-${flink_version} build/flink
 
 # Remove unused components in Flink
-rm -f build/flink/lib/flink-python*
 rm -rf build/flink/examples
 rm -rf build/flink/opt
 
 # Download some dependencies
+wget --directory-prefix=build/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${hadoop_version}-7.0/flink-shaded-hadoop-2-uber-${hadoop_version}-7.0.jar || echo "Download flink shaded hadoop dependency failed."
 wget --directory-prefix=build/flink/lib/ http://central.maven.org/maven2/com/google/guava/guava/${guava_dependency_version}/guava-${guava_dependency_version}.jar || echo "Download guava dependency failed."
 wget --directory-prefix=build/flink/lib/ http://central.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_${scala_version}/${flink_version}/flink-hadoop-compatibility_${scala_version}-${flink_version}.jar || echo "Download flink-hadoop-compatibility dependency failed."
 wget --directory-prefix=build/flink/lib/ http://central.maven.org/maven2/com/sun/jersey/jersey-core/${jersey_version}/jersey-core-${jersey_version}.jar || echo "Download jersey-core dependency failed."
diff --git a/engine-flink/pom.xml b/engine-flink/pom.xml
index e646f84..d7dff84 100644
--- a/engine-flink/pom.xml
+++ b/engine-flink/pom.xml
@@ -34,7 +34,7 @@
     </parent>
 
     <properties>
-        <flink.version>1.7.1</flink.version>
+        <flink.version>1.9.0</flink.version>
     </properties>
 
     <dependencies>
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
index d0371ca..154d4e2 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.engine.flink;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.FallbackKey;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 
@@ -39,38 +40,38 @@ public class FlinkOnYarnConfigMapping {
         //mapping job manager heap size -> -yjm
         ConfigOption<String> jmHeapSizeOption = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY;
         flinkOnYarnConfigMap.put(jmHeapSizeOption.key(), "-yjm");
-        if (jmHeapSizeOption.hasDeprecatedKeys()) {
-            Iterator<String> deprecatedKeyIterator = jmHeapSizeOption.deprecatedKeys().iterator();
+        if (jmHeapSizeOption.hasFallbackKeys()) {
+            Iterator<FallbackKey> deprecatedKeyIterator = jmHeapSizeOption.fallbackKeys().iterator();
             while (deprecatedKeyIterator.hasNext()) {
-                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-yjm");
+                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-yjm");
             }
         }
 
         //mapping task manager heap size -> -ytm
         ConfigOption<String> tmHeapSizeOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
         flinkOnYarnConfigMap.put(tmHeapSizeOption.key(), "-ytm");
-        if (tmHeapSizeOption.hasDeprecatedKeys()) {
-            Iterator<String> deprecatedKeyIterator = tmHeapSizeOption.deprecatedKeys().iterator();
+        if (tmHeapSizeOption.hasFallbackKeys()) {
+            Iterator<FallbackKey> deprecatedKeyIterator = tmHeapSizeOption.fallbackKeys().iterator();
             while (deprecatedKeyIterator.hasNext()) {
-                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ytm");
+                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-ytm");
             }
         }
 
         ConfigOption<Integer> taskSlotNumOption = TaskManagerOptions.NUM_TASK_SLOTS;
         flinkOnYarnConfigMap.put(taskSlotNumOption.key(), "-ys");
-        if (taskSlotNumOption.hasDeprecatedKeys()) {
-            Iterator<String> deprecatedKeyIterator = taskSlotNumOption.deprecatedKeys().iterator();
+        if (taskSlotNumOption.hasFallbackKeys()) {
+            Iterator<FallbackKey> deprecatedKeyIterator = taskSlotNumOption.fallbackKeys().iterator();
             while (deprecatedKeyIterator.hasNext()) {
-                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ys");
+                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-ys");
             }
         }
 
         ConfigOption<Boolean> tmMemoryPreallocate = TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE;
         flinkOnYarnConfigMap.put(tmMemoryPreallocate.key(), "-yD taskmanager.memory.preallocate");
-        if (taskSlotNumOption.hasDeprecatedKeys()) {
-            Iterator<String> deprecatedKeyIterator = tmMemoryPreallocate.deprecatedKeys().iterator();
+        if (taskSlotNumOption.hasFallbackKeys()) {
+            Iterator<FallbackKey> deprecatedKeyIterator = tmMemoryPreallocate.fallbackKeys().iterator();
             while (deprecatedKeyIterator.hasNext()) {
-                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-yD taskmanager.memory.preallocate");
+                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-yD taskmanager.memory.preallocate");
             }
         }
 
diff --git a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
index 4355983..3cb6f28 100644
--- a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
+++ b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
@@ -17,6 +17,7 @@
 */
 package org.apache.kylin.engine.flink;
 
+import org.apache.flink.configuration.FallbackKey;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.junit.Assert;
@@ -41,11 +42,11 @@ public class FlinkOnYarnConfigMappingTest {
                 boolean matchedAnyOne;
                 matchedAnyOne = flinkConfigOption.equals(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key());
                 if (!matchedAnyOne) {
-                    if (JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.hasDeprecatedKeys()) {
-                        Iterator<String> deprecatedKeyIterator = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY
-                                .deprecatedKeys().iterator();
+                    if (JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.hasFallbackKeys()) {
+                        Iterator<FallbackKey> deprecatedKeyIterator = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY
+                                .fallbackKeys().iterator();
                         while (deprecatedKeyIterator.hasNext()) {
-                            matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next());
+                            matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
                         }
                     }
                 }
@@ -66,11 +67,11 @@ public class FlinkOnYarnConfigMappingTest {
                 boolean matchedAnyOne;
                 matchedAnyOne = flinkConfigOption.equals(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key());
                 if (!matchedAnyOne) {
-                    if (TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.hasDeprecatedKeys()) {
-                        Iterator<String> deprecatedKeyIterator = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY
-                                .deprecatedKeys().iterator();
+                    if (TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.hasFallbackKeys()) {
+                        Iterator<FallbackKey> deprecatedKeyIterator = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY
+                                .fallbackKeys().iterator();
                         while (deprecatedKeyIterator.hasNext()) {
-                            matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next());
+                            matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
                         }
                     }
                 }
@@ -91,11 +92,11 @@ public class FlinkOnYarnConfigMappingTest {
                 boolean matchedAnyOne;
                 matchedAnyOne = flinkConfigOption.equals(TaskManagerOptions.NUM_TASK_SLOTS.key());
                 if (!matchedAnyOne) {
-                    if (TaskManagerOptions.NUM_TASK_SLOTS.hasDeprecatedKeys()) {
-                        Iterator<String> deprecatedKeyIterator = TaskManagerOptions.NUM_TASK_SLOTS
-                                .deprecatedKeys().iterator();
+                    if (TaskManagerOptions.NUM_TASK_SLOTS.hasFallbackKeys()) {
+                        Iterator<FallbackKey> deprecatedKeyIterator = TaskManagerOptions.NUM_TASK_SLOTS
+                                .fallbackKeys().iterator();
                         while (deprecatedKeyIterator.hasNext()) {
-                            matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next());
+                            matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
                         }
                     }
                 }