You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/08/31 09:06:14 UTC

[zeppelin] branch master updated: [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new b1966ca  [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2
b1966ca is described below

commit b1966cac03f98833933a3f3749997baf5120bf1e
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Aug 26 11:38:01 2020 +0800

    [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2
    
    ### What is this PR for?
    There's one change in flink 1.10.2 which cause flink interpreter broken. This PR fix this issue and also upgrade flink version to 1.10.2 in pom and integration test.
    
    ### What type of PR is it?
    [Bug Fix | Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * There's one change in flink 1.10.2 which cause flink interpreter broken. This PR fix this issue and also upgrade flink version to 1.10.2 in pom and integration test.
    
    ### How should this be tested?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5016
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3891 from zjffdu/ZEPPELIN-5016 and squashes the following commits:
    
    e1eee58cb [Jeff Zhang] [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2
---
 .../main/java/org/apache/zeppelin/flink/Flink110Shims.java | 14 +++++++++++++-
 flink/pom.xml                                              |  2 +-
 .../zeppelin/integration/FlinkIntegrationTest110.java      |  2 +-
 .../zeppelin/integration/ZeppelinFlinkClusterTest110.java  |  2 +-
 4 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 39ab6e0..fbb1379 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.nio.file.Files;
 import java.util.HashMap;
@@ -225,7 +227,17 @@ public class Flink110Shims extends FlinkShims {
 
   @Override
   public Object getCustomCli(Object cliFrontend, Object commandLine) {
-    return ((CliFrontend)cliFrontend).getActiveCustomCommandLine((CommandLine) commandLine);
+    try {
+      return ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+    } catch (NoSuchMethodError e) {
+      try {
+        Method method = CliFrontend.class.getMethod("getActiveCustomCommandLine", CommandLine.class);
+        return method.invoke((CliFrontend) cliFrontend, commandLine);
+      } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) {
+        LOGGER.error("Fail to call getCustomCli", ex);
+        throw new RuntimeException("Fail to call getCustomCli", ex);
+      }
+    }
   }
 
   @Override
diff --git a/flink/pom.xml b/flink/pom.xml
index 5e0ec61..3e489a1 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -42,7 +42,7 @@
     </modules>
 
     <properties>
-        <flink1.10.version>1.10.1</flink1.10.version>
+        <flink1.10.version>1.10.2</flink1.10.version>
         <flink1.11.version>1.11.1</flink1.11.version>
     </properties>
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
index b8cf293..7779d2f 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
@@ -29,7 +29,7 @@ public class FlinkIntegrationTest110 extends FlinkIntegrationTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.10.1"}
+            {"1.10.2"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
index 20479de..4400706 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
@@ -29,7 +29,7 @@ public class ZeppelinFlinkClusterTest110 extends ZeppelinFlinkClusterTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.10.1"}
+            {"1.10.2"}
     });
   }