You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/05/08 06:10:08 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4790]. Throw exception when using flink for scala 2.12

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new cc73fcc  [ZEPPELIN-4790]. Throw exception when using flink for scala 2.12
cc73fcc is described below

commit cc73fcc1c26168fb20595d1e8d44721a42409e9f
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed May 6 22:02:13 2020 +0800

    [ZEPPELIN-4790]. Throw exception when using flink for scala 2.12
    
    ### What is this PR for?
    
    This PR will throw exception when using flink for scala 2.12 which is not supported now. See the screenshot below.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4790
    
    ### How should this be tested?
    * Manually tested.
    
    ### Screenshots (if appropriate)
    
    ![image](https://user-images.githubusercontent.com/164491/81144289-4f89de80-8fa6-11ea-9df0-8abba0406b7c.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3763 from zjffdu/ZEPPELIN-4790 and squashes the following commits:
    
    eac7b9a65 [Jeff Zhang] address comment
    57b0737da [Jeff Zhang] [ZEPPELIN-4790]. Throw exception when using flink for scala 2.12
    
    (cherry picked from commit e229682affc9dc46a5256e4d8587ecd8f059df94)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../apache/zeppelin/flink/FlinkInterpreter.java    | 22 +++++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)

diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 4565fc0..f02c21b 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -17,13 +17,10 @@
 
 package org.apache.zeppelin.flink;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.scala.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.scala.StreamTableEnvironment;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -48,18 +45,33 @@ public class FlinkInterpreter extends Interpreter {
 
   public FlinkInterpreter(Properties properties) {
     super(properties);
-    this.innerIntp = new FlinkScalaInterpreter(getProperties());
+  }
+
+  private void checkScalaVersion() throws InterpreterException {
+    String scalaVersionString = scala.util.Properties.versionString();
+    LOGGER.info("Using Scala: " + scalaVersionString);
+    if (scalaVersionString.contains("version 2.11")) {
+      return;
+    } else {
+      throw new InterpreterException("Unsupported scala version: " + scalaVersionString +
+              ", Only scala 2.11 is supported");
+    }
   }
 
   @Override
   public void open() throws InterpreterException {
+    checkScalaVersion();
+    
+    this.innerIntp = new FlinkScalaInterpreter(getProperties());
     this.innerIntp.open();
     this.z = this.innerIntp.getZeppelinContext();
   }
 
   @Override
   public void close() throws InterpreterException {
-    this.innerIntp.close();
+    if (this.innerIntp != null) {
+      this.innerIntp.close();
+    }
   }
 
   @Override