You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2015/01/30 21:55:01 UTC
svn commit: r1656116 - in /hive/branches/branch-1.1: ./
itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
ql/...
Author: brock
Date: Fri Jan 30 20:55:01 2015
New Revision: 1656116
URL: http://svn.apache.org/r1656116
Log:
HIVE-9410: ClassNotFoundException occurs during hive query case execution with UDF defined [Spark Branch] (Chengxiang via Xuefu)
Added:
hive/branches/branch-1.1/ql/src/test/queries/clientpositive/lateral_view_explode2.q
- copied unchanged from r1654553, hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q
hive/branches/branch-1.1/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
- copied unchanged from r1654553, hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
hive/branches/branch-1.1/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
- copied unchanged from r1654553, hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
- copied unchanged from r1654553, hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
Modified:
hive/branches/branch-1.1/ (props changed)
hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
Propchange: hive/branches/branch-1.1/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 30 20:55:01 2015
@@ -1,6 +1,6 @@
/hive/branches/branch-0.11:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
/hive/branches/cbo:1605012-1627125
-/hive/branches/spark:1608589-1654414
+/hive/branches/spark:1608589-1654414,1654553
/hive/branches/tez:1494760-1622766
/hive/branches/vectorization:1466908-1527856
/hive/trunk:1655202,1655210,1655213,1655436,1655460,1655894-1655895,1656114
Modified: hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/branch-1.1/itests/src/test/resources/testconfiguration.properties Fri Jan 30 20:55:01 2015
@@ -997,4 +997,5 @@ spark.query.files=add_part_multiple.q, \
vectorized_shufflejoin.q, \
vectorized_string_funcs.q, \
vectorized_timestamp_funcs.q, \
- windowing.q
+ windowing.q, \
+ lateral_view_explode2.q
Modified: hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Jan 30 20:55:01 2015
@@ -207,6 +207,7 @@ public final class Utilities {
public static final String INPUT_NAME = "iocontext.input.name";
public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
+ public static final String HIVE_ADDED_JARS = "hive.added.jars";
/**
* ReduceField:
@@ -364,6 +365,18 @@ public final class Utilities {
Path path = null;
InputStream in = null;
try {
+ String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE);
+ if (engine.equals("spark")) {
+ // TODO Add jar into current thread context classloader as it may be invoked by Spark driver inside
+ // threads, should be unnecessary while SPARK-5377 is resolved.
+ String addedJars = conf.get(HIVE_ADDED_JARS);
+ if (addedJars != null && !addedJars.isEmpty()) {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ ClassLoader newLoader = addToClassPath(loader, addedJars.split(";"));
+ Thread.currentThread().setContextClassLoader(newLoader);
+ }
+ }
+
path = getPlanPath(conf, name);
LOG.info("PLAN PATH = " + path);
assert path != null;
Modified: hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Fri Jan 30 20:55:01 2015
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
import java.io.IOException;
import java.io.Serializable;
import java.net.MalformedURLException;
@@ -26,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -48,15 +52,13 @@ import org.apache.hive.spark.client.JobC
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-
/**
* RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
* wrap a spark job request and send to an remote SparkContext.
@@ -208,6 +210,15 @@ public class RemoteHiveSparkClient imple
@Override
public Serializable call(JobContext jc) throws Exception {
JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes);
+
+ // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
+ // may need to load classes from this jar in other threads.
+ List<String> addedJars = jc.getAddedJars();
+ if (addedJars != null && !addedJars.isEmpty()) {
+ SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]));
+ localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
+ }
+
Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class);
@@ -234,7 +245,6 @@ public class RemoteHiveSparkClient imple
jc.monitor(future, sparkCounters, plan.getCachedRDDIds());
return null;
}
-
}
}
Modified: hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Fri Jan 30 20:55:01 2015
@@ -53,4 +53,9 @@ public interface JobContext {
*/
Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
+ /**
+ * Return all added jar path which added through AddJarJob.
+ */
+ List<String> getAddedJars();
+
}
Modified: hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Fri Jan 30 20:55:01 2015
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hive.spark.counter.SparkCounters;
@@ -32,11 +33,13 @@ class JobContextImpl implements JobConte
private final JavaSparkContext sc;
private final ThreadLocal<MonitorCallback> monitorCb;
private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
+ private final List<String> addedJars;
public JobContextImpl(JavaSparkContext sc) {
this.sc = sc;
this.monitorCb = new ThreadLocal<MonitorCallback>();
monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
+ addedJars = new CopyOnWriteArrayList<String>();
}
@@ -57,6 +60,11 @@ class JobContextImpl implements JobConte
return monitoredJobs;
}
+ @Override
+ public List<String> getAddedJars() {
+ return addedJars;
+ }
+
void setMonitorCb(MonitorCallback cb) {
monitorCb.set(cb);
}
Modified: hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1656116&r1=1656115&r2=1656116&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Fri Jan 30 20:55:01 2015
@@ -529,6 +529,9 @@ class SparkClientImpl implements SparkCl
@Override
public Serializable call(JobContext jc) throws Exception {
jc.sc().addJar(path);
+ // Following remote job may refer to classes in this jar, and the remote job would be executed
+ // in a different thread, so we add this jar path to JobContext for further usage.
+ jc.getAddedJars().add(path);
return null;
}