You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:41 UTC
[kylin] 28/30: [KYLIN-3921] Bump Flink version from 1.7.1 to 1.9.0
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9bdc440c4fb60f468dafdd32afdbd6906d1c2762
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Sep 4 19:03:10 2019 +0800
[KYLIN-3921] Bump Flink version from 1.7.1 to 1.9.0
---
build/script/download-flink.sh | 15 +++++++------
engine-flink/pom.xml | 2 +-
.../engine/flink/FlinkOnYarnConfigMapping.java | 25 +++++++++++-----------
.../engine/flink/FlinkOnYarnConfigMappingTest.java | 25 +++++++++++-----------
4 files changed, 35 insertions(+), 32 deletions(-)
diff --git a/build/script/download-flink.sh b/build/script/download-flink.sh
index 1520408..1185904 100644
--- a/build/script/download-flink.sh
+++ b/build/script/download-flink.sh
@@ -27,36 +27,37 @@ if [[ `uname -a` =~ "Darwin" ]]; then
alias md5cmd="md5 -q"
fi
-flink_version="1.7.2"
+flink_version="1.9.0"
scala_version="2.11"
flink_pkg_md5="e0b5ce7f6352009c74b6c369f5872a5a"
guava_dependency_version="14.0.1"
jersey_version="1.9"
+hadoop_version="2.7.5"
hbase_version="1.1.1"
yammer_version="2.2.0"
htrace_version="3.1.0-incubating"
-if [ ! -f "build/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz" ]; then
+if [ ! -f "build/flink-${flink_version}-bin-scala_${scala_version}.tgz" ]; then
echo "no binary file found"
- wget --directory-prefix=build/ http://archive.apache.org/dist/flink/flink-${flink_version}/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz || echo "Download flink failed"
+ wget --directory-prefix=build/ http://archive.apache.org/dist/flink/flink-${flink_version}/flink-${flink_version}-bin-scala_${scala_version}.tgz || echo "Download flink failed"
else
if [ `md5cmd build/ | awk '{print $1}'` != "${flink_pkg_md5}" ]; then
echo "md5 check failed"
- rm build/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz
- wget --directory-prefix=build/ http://archive.apache.org/dist/flink/flink-${flink_version}/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz || echo "Download flink failed"
+ rm build/flink-${flink_version}-bin-scala_${scala_version}.tgz
+ wget --directory-prefix=build/ http://archive.apache.org/dist/flink/flink-${flink_version}/flink-${flink_version}-bin-scala_${scala_version}.tgz || echo "Download flink failed"
fi
fi
unalias md5cmd
-tar -zxvf build/flink-${flink_version}-bin-hadoop27-scala_${scala_version}.tgz -C build/ || { exit 1; }
+tar -zxvf build/flink-${flink_version}-bin-scala_${scala_version}.tgz -C build/ || { exit 1; }
mv build/flink-${flink_version} build/flink
# Remove unused components in Flink
-rm -f build/flink/lib/flink-python*
rm -rf build/flink/examples
rm -rf build/flink/opt
# Download some dependencies
+wget --directory-prefix=build/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${hadoop_version}-7.0/flink-shaded-hadoop-2-uber-${hadoop_version}-7.0.jar || echo "Download flink shaded hadoop dependency failed."
wget --directory-prefix=build/flink/lib/ http://central.maven.org/maven2/com/google/guava/guava/${guava_dependency_version}/guava-${guava_dependency_version}.jar || echo "Download guava dependency failed."
wget --directory-prefix=build/flink/lib/ http://central.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_${scala_version}/${flink_version}/flink-hadoop-compatibility_${scala_version}-${flink_version}.jar || echo "Download flink-hadoop-compatibility dependency failed."
wget --directory-prefix=build/flink/lib/ http://central.maven.org/maven2/com/sun/jersey/jersey-core/${jersey_version}/jersey-core-${jersey_version}.jar || echo "Download jersey-core dependency failed."
diff --git a/engine-flink/pom.xml b/engine-flink/pom.xml
index e646f84..d7dff84 100644
--- a/engine-flink/pom.xml
+++ b/engine-flink/pom.xml
@@ -34,7 +34,7 @@
</parent>
<properties>
- <flink.version>1.7.1</flink.version>
+ <flink.version>1.9.0</flink.version>
</properties>
<dependencies>
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
index d0371ca..154d4e2 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
@@ -18,6 +18,7 @@
package org.apache.kylin.engine.flink;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.FallbackKey;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -39,38 +40,38 @@ public class FlinkOnYarnConfigMapping {
//mapping job manager heap size -> -yjm
ConfigOption<String> jmHeapSizeOption = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY;
flinkOnYarnConfigMap.put(jmHeapSizeOption.key(), "-yjm");
- if (jmHeapSizeOption.hasDeprecatedKeys()) {
- Iterator<String> deprecatedKeyIterator = jmHeapSizeOption.deprecatedKeys().iterator();
+ if (jmHeapSizeOption.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator = jmHeapSizeOption.fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
- flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-yjm");
+ flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-yjm");
}
}
//mapping task manager heap size -> -ytm
ConfigOption<String> tmHeapSizeOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
flinkOnYarnConfigMap.put(tmHeapSizeOption.key(), "-ytm");
- if (tmHeapSizeOption.hasDeprecatedKeys()) {
- Iterator<String> deprecatedKeyIterator = tmHeapSizeOption.deprecatedKeys().iterator();
+ if (tmHeapSizeOption.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator = tmHeapSizeOption.fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
- flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ytm");
+ flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-ytm");
}
}
ConfigOption<Integer> taskSlotNumOption = TaskManagerOptions.NUM_TASK_SLOTS;
flinkOnYarnConfigMap.put(taskSlotNumOption.key(), "-ys");
- if (taskSlotNumOption.hasDeprecatedKeys()) {
- Iterator<String> deprecatedKeyIterator = taskSlotNumOption.deprecatedKeys().iterator();
+ if (taskSlotNumOption.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator = taskSlotNumOption.fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
- flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ys");
+ flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-ys");
}
}
ConfigOption<Boolean> tmMemoryPreallocate = TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE;
flinkOnYarnConfigMap.put(tmMemoryPreallocate.key(), "-yD taskmanager.memory.preallocate");
- if (taskSlotNumOption.hasDeprecatedKeys()) {
- Iterator<String> deprecatedKeyIterator = tmMemoryPreallocate.deprecatedKeys().iterator();
+ if (taskSlotNumOption.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator = tmMemoryPreallocate.fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
- flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-yD taskmanager.memory.preallocate");
+ flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-yD taskmanager.memory.preallocate");
}
}
diff --git a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
index 4355983..3cb6f28 100644
--- a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
+++ b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.kylin.engine.flink;
+import org.apache.flink.configuration.FallbackKey;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.junit.Assert;
@@ -41,11 +42,11 @@ public class FlinkOnYarnConfigMappingTest {
boolean matchedAnyOne;
matchedAnyOne = flinkConfigOption.equals(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key());
if (!matchedAnyOne) {
- if (JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.hasDeprecatedKeys()) {
- Iterator<String> deprecatedKeyIterator = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY
- .deprecatedKeys().iterator();
+ if (JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY
+ .fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
- matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next());
+ matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
}
}
}
@@ -66,11 +67,11 @@ public class FlinkOnYarnConfigMappingTest {
boolean matchedAnyOne;
matchedAnyOne = flinkConfigOption.equals(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key());
if (!matchedAnyOne) {
- if (TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.hasDeprecatedKeys()) {
- Iterator<String> deprecatedKeyIterator = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY
- .deprecatedKeys().iterator();
+ if (TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY
+ .fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
- matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next());
+ matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
}
}
}
@@ -91,11 +92,11 @@ public class FlinkOnYarnConfigMappingTest {
boolean matchedAnyOne;
matchedAnyOne = flinkConfigOption.equals(TaskManagerOptions.NUM_TASK_SLOTS.key());
if (!matchedAnyOne) {
- if (TaskManagerOptions.NUM_TASK_SLOTS.hasDeprecatedKeys()) {
- Iterator<String> deprecatedKeyIterator = TaskManagerOptions.NUM_TASK_SLOTS
- .deprecatedKeys().iterator();
+ if (TaskManagerOptions.NUM_TASK_SLOTS.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator = TaskManagerOptions.NUM_TASK_SLOTS
+ .fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
- matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next());
+ matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
}
}
}