You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by GitBox <gi...@apache.org> on 2021/06/02 14:58:13 UTC

[GitHub] [zeppelin] cuspymd commented on a change in pull request #4128: [ZEPPELIN-5339] Support scala 2.12 for flink

cuspymd commented on a change in pull request #4128:
URL: https://github.com/apache/zeppelin/pull/4128#discussion_r644049588



##########
File path: flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
##########
@@ -25,45 +26,74 @@
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URLClassLoader;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
- * Interpreter for flink scala. It delegates all the function to FlinkScalaInterpreter.
+ * Interpreter for flink scala. It delegates all the function to FlinkScalaInterpreter
+ * which is implemented by flink scala shell underneath.
  */
 public class FlinkInterpreter extends Interpreter {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(FlinkInterpreter.class);
 
+  private Map<String, String> innerInterpreterClassMap = new HashMap<>();
   private FlinkScalaInterpreter innerIntp;
-  private FlinkZeppelinContext z;
+  private ZeppelinContext z;
 
   public FlinkInterpreter(Properties properties) {
     super(properties);
+    innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.flink.FlinkScala211Interpreter");
+    innerInterpreterClassMap.put("2.12", "org.apache.zeppelin.flink.FlinkScala212Interpreter");
   }
 
-  private void checkScalaVersion() throws InterpreterException {
+  private String extractScalaVersion() throws InterpreterException {
     String scalaVersionString = scala.util.Properties.versionString();
     LOGGER.info("Using Scala: " + scalaVersionString);
     if (scalaVersionString.contains("version 2.11")) {
-      return;
+      return "2.11";
+    } else if (scalaVersionString.contains("version 2.12")) {
+      return "2.12";
     } else {
       throw new InterpreterException("Unsupported scala version: " + scalaVersionString +
-              ", Only scala 2.11 is supported");
+              ", Only scala 2.11/2.12 is supported");
     }
   }
 
   @Override
   public void open() throws InterpreterException {
-    checkScalaVersion();
-    
-    this.innerIntp = new FlinkScalaInterpreter(getProperties());
-    this.innerIntp.open();
-    this.z = this.innerIntp.getZeppelinContext();
+    try {
+      this.innerIntp = loadFlinkScalaInterpreter();
+      this.innerIntp.open();
+      this.z = this.innerIntp.getZeppelinContext();
+    } catch (Exception e) {
+      throw new InterpreterException("Fail to open FlinkInterpreter", e);

Review comment:
       It seems be strange to run again `new InterpreterException()` when `loadFlinkScalaInterpreter()` throws an exception of the type `InterpreterException`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org