You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/08/31 09:06:36 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 0fefff4  [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2
0fefff4 is described below

commit 0fefff4d9cf44116e74a0a7b1fb039ae5e469cf9
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Aug 26 11:38:01 2020 +0800

    [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2
    
    ### What is this PR for?
    There's one change in flink 1.10.2 which cause flink interpreter broken. This PR fix this issue and also upgrade flink version to 1.10.2 in pom and integration test.
    
    ### What type of PR is it?
    [Bug Fix | Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * There's one change in flink 1.10.2 which cause flink interpreter broken. This PR fix this issue and also upgrade flink version to 1.10.2 in pom and integration test.
    
    ### How should this be tested?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5016
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3891 from zjffdu/ZEPPELIN-5016 and squashes the following commits:
    
    e1eee58cb [Jeff Zhang] [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2
    
    (cherry picked from commit b1966cac03f98833933a3f3749997baf5120bf1e)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../main/java/org/apache/zeppelin/flink/Flink110Shims.java | 14 +++++++++++++-
 flink/pom.xml                                              |  2 +-
 .../zeppelin/integration/FlinkIntegrationTest110.java      |  2 +-
 .../zeppelin/integration/ZeppelinFlinkClusterTest110.java  |  2 +-
 4 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 39ab6e0..fbb1379 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.nio.file.Files;
 import java.util.HashMap;
@@ -225,7 +227,17 @@ public class Flink110Shims extends FlinkShims {
 
   @Override
   public Object getCustomCli(Object cliFrontend, Object commandLine) {
-    return ((CliFrontend)cliFrontend).getActiveCustomCommandLine((CommandLine) commandLine);
+    try {
+      return ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+    } catch (NoSuchMethodError e) {
+      try {
+        Method method = CliFrontend.class.getMethod("getActiveCustomCommandLine", CommandLine.class);
+        return method.invoke((CliFrontend) cliFrontend, commandLine);
+      } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) {
+        LOGGER.error("Fail to call getCustomCli", ex);
+        throw new RuntimeException("Fail to call getCustomCli", ex);
+      }
+    }
   }
 
   @Override
diff --git a/flink/pom.xml b/flink/pom.xml
index 5e0ec61..3e489a1 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -42,7 +42,7 @@
     </modules>
 
     <properties>
-        <flink1.10.version>1.10.1</flink1.10.version>
+        <flink1.10.version>1.10.2</flink1.10.version>
         <flink1.11.version>1.11.1</flink1.11.version>
     </properties>
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
index b8cf293..7779d2f 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
@@ -29,7 +29,7 @@ public class FlinkIntegrationTest110 extends FlinkIntegrationTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.10.1"}
+            {"1.10.2"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
index 20479de..4400706 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
@@ -29,7 +29,7 @@ public class ZeppelinFlinkClusterTest110 extends ZeppelinFlinkClusterTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.10.1"}
+            {"1.10.2"}
     });
   }