You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2015/01/30 21:55:01 UTC

svn commit: r1656116 - in /hive/branches/branch-1.1: ./ itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/ ql/...

Author: brock
Date: Fri Jan 30 20:55:01 2015
New Revision: 1656116

URL: http://svn.apache.org/r1656116
Log:
HIVE-9410: ClassNotFoundException occurs during hive query case execution with UDF defined [Spark Branch] (Chengxiang via Xuefu)

Added:
    hive/branches/branch-1.1/ql/src/test/queries/clientpositive/lateral_view_explode2.q
      - copied unchanged from r1654553, hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q
    hive/branches/branch-1.1/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
      - copied unchanged from r1654553, hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
    hive/branches/branch-1.1/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
      - copied unchanged from r1654553, hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
    hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
      - copied unchanged from r1654553, hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
Modified:
    hive/branches/branch-1.1/   (props changed)
    hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties
    hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
    hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
    hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java

Propchange: hive/branches/branch-1.1/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 30 20:55:01 2015
@@ -1,6 +1,6 @@
 /hive/branches/branch-0.11:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
 /hive/branches/cbo:1605012-1627125
-/hive/branches/spark:1608589-1654414
+/hive/branches/spark:1608589-1654414,1654553
 /hive/branches/tez:1494760-1622766
 /hive/branches/vectorization:1466908-1527856
 /hive/trunk:1655202,1655210,1655213,1655436,1655460,1655894-1655895,1656114

Modified: hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties Fri Jan 30 20:55:01 2015
@@ -997,4 +997,5 @@ spark.query.files=add_part_multiple.q, \
   vectorized_shufflejoin.q, \
   vectorized_string_funcs.q, \
   vectorized_timestamp_funcs.q, \
-  windowing.q
+  windowing.q, \
+  lateral_view_explode2.q

Modified: hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Jan 30 20:55:01 2015
@@ -207,6 +207,7 @@ public final class Utilities {
   public static final String INPUT_NAME = "iocontext.input.name";
   public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
   public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
+  public static final String HIVE_ADDED_JARS = "hive.added.jars";
 
   /**
    * ReduceField:
@@ -364,6 +365,18 @@ public final class Utilities {
     Path path = null;
     InputStream in = null;
     try {
+      String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE);
+      if (engine.equals("spark")) {
+        // TODO Add jar into current thread context classloader as it may be invoked by Spark driver inside
+        // threads, should be unnecessary while SPARK-5377 is resolved.
+        String addedJars = conf.get(HIVE_ADDED_JARS);
+        if (addedJars != null && !addedJars.isEmpty()) {
+          ClassLoader loader = Thread.currentThread().getContextClassLoader();
+          ClassLoader newLoader = addToClassPath(loader, addedJars.split(";"));
+          Thread.currentThread().setContextClassLoader(newLoader);
+        }
+      }
+
       path = getPlanPath(conf, name);
       LOG.info("PLAN PATH = " + path);
       assert path != null;

Modified: hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Fri Jan 30 20:55:01 2015
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.MalformedURLException;
@@ -26,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,15 +52,13 @@ import org.apache.hive.spark.client.JobC
 import org.apache.hive.spark.client.JobHandle;
 import org.apache.hive.spark.client.SparkClient;
 import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.hive.spark.client.SparkClientUtilities;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-
 /**
  * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
  * wrap a spark job request and send to an remote SparkContext.
@@ -208,6 +210,15 @@ public class RemoteHiveSparkClient imple
     @Override
     public Serializable call(JobContext jc) throws Exception {
       JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes);
+
+      // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
+      // may need to load classes from this jar in other threads.
+      List<String> addedJars = jc.getAddedJars();
+      if (addedJars != null && !addedJars.isEmpty()) {
+        SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]));
+        localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
+      }
+
       Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
       SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class);
 
@@ -234,7 +245,6 @@ public class RemoteHiveSparkClient imple
       jc.monitor(future, sparkCounters, plan.getCachedRDDIds());
       return null;
     }
-
   }
 
 }

Modified: hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Fri Jan 30 20:55:01 2015
@@ -53,4 +53,9 @@ public interface JobContext {
    */
   Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
 
+  /**
+   * Return all added jar path which added through AddJarJob.
+   */
+  List<String> getAddedJars();
+
 }

Modified: hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Fri Jan 30 20:55:01 2015
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.hive.spark.counter.SparkCounters;
 
@@ -32,11 +33,13 @@ class JobContextImpl implements JobConte
   private final JavaSparkContext sc;
   private final ThreadLocal<MonitorCallback> monitorCb;
   private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
+  private final List<String> addedJars;
 
   public JobContextImpl(JavaSparkContext sc) {
     this.sc = sc;
     this.monitorCb = new ThreadLocal<MonitorCallback>();
     monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
+    addedJars = new CopyOnWriteArrayList<String>();
   }
 
 
@@ -57,6 +60,11 @@ class JobContextImpl implements JobConte
     return monitoredJobs;
   }
 
+  @Override
+  public List<String> getAddedJars() {
+    return addedJars;
+  }
+
   void setMonitorCb(MonitorCallback cb) {
     monitorCb.set(cb);
   }

Modified: hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Fri Jan 30 20:55:01 2015
@@ -529,6 +529,9 @@ class SparkClientImpl implements SparkCl
     @Override
     public Serializable call(JobContext jc) throws Exception {
       jc.sc().addJar(path);
+      // Following remote job may refer to classes in this jar, and the remote job would be executed
+      // in a different thread, so we add this jar path to JobContext for further usage.
+      jc.getAddedJars().add(path);
       return null;
     }