You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by GitBox <gi...@apache.org> on 2019/06/10 07:10:01 UTC

[GitHub] [zeppelin] liuxunorg commented on a change in pull request #3375: ZEPPELIN-4176. Remove old spark interpreter

liuxunorg commented on a change in pull request #3375: ZEPPELIN-4176. Remove old spark interpreter
URL: https://github.com/apache/zeppelin/pull/3375#discussion_r291901506
 
 

 ##########
 File path: spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
 ##########
 @@ -1,1281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.spark.JobProgressUtil;
-import org.apache.spark.SecurityManager;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.repl.SparkILoop;
-import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.ui.SparkUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.WellKnownResourceName;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Console;
-import scala.Enumeration.Value;
-import scala.None;
-import scala.Option;
-import scala.Some;
-import scala.collection.JavaConversions;
-import scala.collection.convert.WrapAsJava$;
-import scala.reflect.io.AbstractFile;
-import scala.tools.nsc.Global;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.Completion.Candidates;
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Spark interpreter for Zeppelin.
- *
- */
-public class OldSparkInterpreter extends AbstractSparkInterpreter {
-  public static Logger logger = LoggerFactory.getLogger(OldSparkInterpreter.class);
-
-  private SparkZeppelinContext z;
-  private SparkILoop interpreter;
-  /**
-   * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
-   * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
-   */
-  private Object intp;
-  private SparkConf conf;
-  private static SparkContext sc;
-  private static SQLContext sqlc;
-  private static InterpreterHookRegistry hooks;
-  private static SparkEnv env;
-  private static Object sparkSession;    // spark 2.x
-  private static SparkListener sparkListener;
-  private static AbstractFile classOutputDir;
-  private static Integer sharedInterpreterLock = new Integer(0);
-  private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
-
-  private InterpreterOutputStream out;
-  private SparkDependencyResolver dep;
-  private static String sparkUrl;
-
-  /**
-   * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
-   */
-  private Object completer = null;
-
-  private Map<String, Object> binder;
-  private SparkVersion sparkVersion;
-  private static File outputDir;          // class outputdir for scala 2.11
-  private Object classServer;      // classserver for scala 2.11
-  private JavaSparkContext jsc;
-  private boolean enableSupportedVersionCheck;
-
-  private SparkShims sparkShims;
-
-  public OldSparkInterpreter(Properties property) {
-    super(property);
-    out = new InterpreterOutputStream(logger);
-  }
-
-  public OldSparkInterpreter(Properties property, SparkContext sc) {
-    this(property);
-    this.sc = sc;
-    env = SparkEnv.get();
-  }
-
-  public SparkContext getSparkContext() {
-    synchronized (sharedInterpreterLock) {
-      if (sc == null) {
-        sc = createSparkContext();
-        env = SparkEnv.get();
-      }
-      return sc;
-    }
-  }
-
-  public JavaSparkContext getJavaSparkContext() {
-    synchronized (sharedInterpreterLock) {
-      if (jsc == null) {
-        jsc = JavaSparkContext.fromSparkContext(sc);
-      }
-      return jsc;
-    }
-  }
-
-  public boolean isSparkContextInitialized() {
-    synchronized (sharedInterpreterLock) {
-      return sc != null;
-    }
-  }
-
-  private boolean useHiveContext() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
-  }
-
-  /**
-   * See org.apache.spark.sql.SparkSession.hiveClassesArePresent
-   * @return
-   */
-  private boolean hiveClassesArePresent() {
-    try {
-      this.getClass().forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable");
-      this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf");
-      return true;
-    } catch (ClassNotFoundException | NoClassDefFoundError e) {
-      return false;
-    }
-  }
-
-  private boolean importImplicit() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.importImplicit"));
-  }
-
-  public Object getSparkSession() {
-    synchronized (sharedInterpreterLock) {
-      if (sparkSession == null) {
-        createSparkSession();
-      }
-      return sparkSession;
-    }
-  }
-
-  public SQLContext getSQLContext() {
-    synchronized (sharedInterpreterLock) {
-      if (Utils.isSpark2()) {
-        return getSQLContext_2();
-      } else {
-        return getSQLContext_1();
-      }
-    }
-  }
-
-  /**
-   * Get SQLContext for spark 2.x
-   */
-  private SQLContext getSQLContext_2() {
-    if (sqlc == null) {
-      sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext");
-    }
-    return sqlc;
-  }
-
-  public SQLContext getSQLContext_1() {
-    if (sqlc == null) {
-      if (useHiveContext()) {
-        String name = "org.apache.spark.sql.hive.HiveContext";
-        Constructor<?> hc;
-        try {
-          hc = getClass().getClassLoader().loadClass(name)
-              .getConstructor(SparkContext.class);
-          sqlc = (SQLContext) hc.newInstance(getSparkContext());
-        } catch (NoSuchMethodException | SecurityException
-            | ClassNotFoundException | InstantiationException
-            | IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException e) {
-          logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
-          // when hive dependency is not loaded, it'll fail.
-          // in this case SQLContext can be used.
-          sqlc = new SQLContext(getSparkContext());
-        }
-      } else {
-        sqlc = new SQLContext(getSparkContext());
-      }
-    }
-    return sqlc;
-  }
-
-
-  public SparkDependencyResolver getDependencyResolver() {
-    if (dep == null) {
-      dep = new SparkDependencyResolver(
-          (Global) Utils.invokeMethod(intp, "global"),
-          (ClassLoader) Utils.invokeMethod(Utils.invokeMethod(intp, "classLoader"), "getParent"),
-          sc,
-          getProperty("zeppelin.dep.localrepo"),
-          getProperty("zeppelin.dep.additionalRemoteRepository"));
-    }
-    return dep;
-  }
-
-  public boolean isYarnMode() {
-    String master = getProperty("master");
-    if (master == null) {
-      master = getProperty("spark.master", "local[*]");
-    }
-    return master.startsWith("yarn");
-  }
-
-  /**
-   * Spark 2.x
-   * Create SparkSession
-   */
-  public Object createSparkSession() {
-    // use local mode for embedded spark mode when spark.master is not found
-    conf.setIfMissing("spark.master", "local");
-    logger.info("------ Create new SparkSession {} -------", conf.get("spark.master"));
-    String execUri = System.getenv("SPARK_EXECUTOR_URI");
-    if (outputDir != null) {
-      conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath());
-    }
-
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri);
-    }
-    conf.set("spark.scheduler.mode", "FAIR");
-
-    Properties intpProperty = getProperties();
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      String val = toString(intpProperty.get(key));
-      if (!val.trim().isEmpty()) {
-        if (key.startsWith("spark.")) {
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
-          conf.set(key, val);
-        }
-        if (key.startsWith("zeppelin.spark.")) {
-          String sparkPropertyKey = key.substring("zeppelin.spark.".length());
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
-          conf.set(sparkPropertyKey, val);
-        }
-      }
-    }
-
-    Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
-    Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
-    Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
-
-    if (useHiveContext()) {
-      if (hiveClassesArePresent()) {
-        Utils.invokeMethod(builder, "enableHiveSupport");
-        sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-        logger.info("Created Spark session with Hive support");
-      } else {
-        Utils.invokeMethod(builder, "config",
-            new Class[]{ String.class, String.class},
-            new Object[]{ "spark.sql.catalogImplementation", "in-memory"});
-        sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-        logger.info("Created Spark session with Hive support use in-memory catalogImplementation");
-      }
-    } else {
-      sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-      logger.info("Created Spark session");
-    }
-
-    return sparkSession;
-  }
-
-  public SparkContext createSparkContext() {
-    if (Utils.isSpark2()) {
-      return createSparkContext_2();
-    } else {
-      return createSparkContext_1();
-    }
-  }
-
-  /**
-   * Create SparkContext for spark 2.x
-   * @return
-   */
-  private SparkContext createSparkContext_2() {
-    return (SparkContext) Utils.invokeMethod(sparkSession, "sparkContext");
-  }
-
-  public SparkContext createSparkContext_1() {
-    // use local mode for embedded spark mode when spark.master is not found
-    if (!conf.contains("spark.master")) {
-      conf.setMaster("local");
-    }
-    logger.info("------ Create new SparkContext {} -------", conf.get("spark.master"));
-
-    String execUri = System.getenv("SPARK_EXECUTOR_URI");
-    String[] jars = null;
-
-    if (Utils.isScala2_10()) {
-      jars = (String[]) Utils.invokeStaticMethod(SparkILoop.class, "getAddedJars");
-    } else {
-      jars = (String[]) Utils.invokeStaticMethod(
-          Utils.findClass("org.apache.spark.repl.Main"), "getAddedJars");
-    }
-
-    String classServerUri = null;
-    String replClassOutputDirectory = null;
-
-    try { // in case of spark 1.1x, spark 1.2x
-      Method classServer = intp.getClass().getMethod("classServer");
-      Object httpServer = classServer.invoke(intp);
-      classServerUri = (String) Utils.invokeMethod(httpServer, "uri");
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      // continue
-    }
-
-    if (classServerUri == null) {
-      try { // for spark 1.3x
-        Method classServer = intp.getClass().getMethod("classServerUri");
-        classServerUri = (String) classServer.invoke(intp);
-      } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-          | IllegalArgumentException | InvocationTargetException e) {
-        // continue instead of: throw new InterpreterException(e);
-        // Newer Spark versions (like the patched CDH5.7.0 one) don't contain this method
-        logger.warn(String.format("Spark method classServerUri not available due to: [%s]",
-            e.getMessage()));
-      }
-    }
-
-    if (classServerUri == null) {
-      try { // for RcpEnv
-        Method getClassOutputDirectory = intp.getClass().getMethod("getClassOutputDirectory");
-        File classOutputDirectory = (File) getClassOutputDirectory.invoke(intp);
-        replClassOutputDirectory = classOutputDirectory.getAbsolutePath();
-      } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-          | IllegalArgumentException | InvocationTargetException e) {
-        // continue
-      }
-    }
-
-    if (Utils.isScala2_11()) {
-      classServer = createHttpServer(outputDir);
-      Utils.invokeMethod(classServer, "start");
-      classServerUri = (String) Utils.invokeMethod(classServer, "uri");
-    }
-
-    if (classServerUri != null) {
-      conf.set("spark.repl.class.uri", classServerUri);
-    }
-
-    if (replClassOutputDirectory != null) {
-      conf.set("spark.repl.class.outputDir", replClassOutputDirectory);
-    }
-
-    if (jars.length > 0) {
-      conf.setJars(jars);
-    }
-
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri);
-    }
-    conf.set("spark.scheduler.mode", "FAIR");
-
-    Properties intpProperty = getProperties();
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      String val = toString(intpProperty.get(key));
-      if (!val.trim().isEmpty()) {
-        if (key.startsWith("spark.")) {
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
-          conf.set(key, val);
-        }
-
-        if (key.startsWith("zeppelin.spark.")) {
-          String sparkPropertyKey = key.substring("zeppelin.spark.".length());
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
-          conf.set(sparkPropertyKey, val);
-        }
-      }
-    }
-    SparkContext sparkContext = new SparkContext(conf);
-    return sparkContext;
-  }
-
-  static final String toString(Object o) {
-    return (o instanceof String) ? (String) o : "";
-  }
-
-  public static boolean useSparkSubmit() {
-    return null != System.getenv("SPARK_SUBMIT");
-  }
-
-  public boolean printREPLOutput() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput"));
-  }
-
-  @Override
-  public void open() throws InterpreterException {
-    this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
-        getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
-
-    // set properties and do login before creating any spark stuff for secured cluster
-    if (isYarnMode()) {
-      System.setProperty("SPARK_YARN_MODE", "true");
-    }
-    if (getProperties().containsKey("spark.yarn.keytab") &&
-        getProperties().containsKey("spark.yarn.principal")) {
-      try {
-        String keytab = getProperties().getProperty("spark.yarn.keytab");
-        String principal = getProperties().getProperty("spark.yarn.principal");
-        UserGroupInformation.loginUserFromKeytab(principal, keytab);
 
 Review comment:
   Set the kerberos authentication information according to the configuration in the new spark interpreter.
   Can it be added to the new spark interpreter?
   This is very useful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services