You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/24 17:07:41 UTC
svn commit: r1654553 - in /hive/branches/spark: itests/src/test/resources/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
ql/src/test...
Author: xuefu
Date: Sat Jan 24 16:07:40 2015
New Revision: 1654553
URL: http://svn.apache.org/r1654553
Log:
HIVE-9410: ClassNotFoundException occurs during hive query case execution with UDF defined [Spark Branch] (Chengxiang via Xuefu)
Added:
hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q
hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
Modified:
hive/branches/spark/itests/src/test/resources/testconfiguration.properties
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Sat Jan 24 16:07:40 2015
@@ -996,4 +996,5 @@ spark.query.files=add_part_multiple.q, \
vectorized_shufflejoin.q, \
vectorized_string_funcs.q, \
vectorized_timestamp_funcs.q, \
- windowing.q
+ windowing.q, \
+ lateral_view_explode2.q
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Jan 24 16:07:40 2015
@@ -207,6 +207,7 @@ public final class Utilities {
public static final String INPUT_NAME = "iocontext.input.name";
public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
+ public static final String HIVE_ADDED_JARS = "hive.added.jars";
/**
* ReduceField:
@@ -364,6 +365,18 @@ public final class Utilities {
Path path = null;
InputStream in = null;
try {
+ String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE);
+ if (engine.equals("spark")) {
+ // TODO Add jar into current thread context classloader as it may be invoked by Spark driver inside
+ // threads, should be unnecessary while SPARK-5377 is resolved.
+ String addedJars = conf.get(HIVE_ADDED_JARS);
+ if (addedJars != null && !addedJars.isEmpty()) {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ ClassLoader newLoader = addToClassPath(loader, addedJars.split(";"));
+ Thread.currentThread().setContextClassLoader(newLoader);
+ }
+ }
+
path = getPlanPath(conf, name);
LOG.info("PLAN PATH = " + path);
assert path != null;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Sat Jan 24 16:07:40 2015
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
import java.io.IOException;
import java.io.Serializable;
import java.net.MalformedURLException;
@@ -26,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -48,15 +52,13 @@ import org.apache.hive.spark.client.JobC
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-
/**
* RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
* wrap a spark job request and send to an remote SparkContext.
@@ -208,6 +210,15 @@ public class RemoteHiveSparkClient imple
@Override
public Serializable call(JobContext jc) throws Exception {
JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes);
+
+ // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
+ // may need to load classes from this jar in other threads.
+ List<String> addedJars = jc.getAddedJars();
+ if (addedJars != null && !addedJars.isEmpty()) {
+ SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]));
+ localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
+ }
+
Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class);
@@ -234,7 +245,6 @@ public class RemoteHiveSparkClient imple
jc.monitor(future, sparkCounters, plan.getCachedRDDIds());
return null;
}
-
}
}
Added: hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q?rev=1654553&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q (added)
+++ hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q Sat Jan 24 16:07:40 2015
@@ -0,0 +1,9 @@
+add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar;
+
+CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2';
+
+EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3;
+
+SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3;
+
+DROP TEMPORARY FUNCTION explode2;
\ No newline at end of file
Added: hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out?rev=1654553&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out Sat Jan 24 16:07:40 2015
@@ -0,0 +1,102 @@
+PREHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: explode2
+POSTHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: explode2
+PREHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
+ Lateral View Forward
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ Statistics: Num rows: 500 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Lateral View Join Operator
+ outputColumnNames: _col5, _col6
+ Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ keys: _col5 (type: int), _col6 (type: int)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: int)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Select Operator
+ expressions: array(1,2,3) (type: array<int>)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+ UDTF Operator
+ Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+ function name: explode
+ Lateral View Join Operator
+ outputColumnNames: _col5, _col6
+ Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ keys: _col5 (type: int), _col6 (type: int)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: int)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Reduce Operator Tree:
+ Group By Operator
+ keys: KEY._col0 (type: int), KEY._col1 (type: int)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+ Select Operator
+ expressions: _col0 (type: int), _col1 (type: int)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+ Limit
+ Number of rows: 3
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 3
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+1 1
+2 2
+3 3
+PREHOOK: query: DROP TEMPORARY FUNCTION explode2
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: explode2
+POSTHOOK: query: DROP TEMPORARY FUNCTION explode2
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: explode2
Added: hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out?rev=1654553&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out Sat Jan 24 16:07:40 2015
@@ -0,0 +1,108 @@
+PREHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: explode2
+POSTHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: explode2
+PREHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (GROUP, 2)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
+ Lateral View Forward
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ Statistics: Num rows: 500 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Lateral View Join Operator
+ outputColumnNames: _col5, _col6
+ Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ keys: _col5 (type: int), _col6 (type: int)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: int)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Select Operator
+ expressions: array(1,2,3) (type: array<int>)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+ UDTF Operator
+ Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+ function name: explode
+ Lateral View Join Operator
+ outputColumnNames: _col5, _col6
+ Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ keys: _col5 (type: int), _col6 (type: int)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: int)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+ Reducer 2
+ Reduce Operator Tree:
+ Group By Operator
+ keys: KEY._col0 (type: int), KEY._col1 (type: int)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+ Select Operator
+ expressions: _col0 (type: int), _col1 (type: int)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+ Limit
+ Number of rows: 3
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 3
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+2 2
+1 1
+3 3
+PREHOOK: query: DROP TEMPORARY FUNCTION explode2
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: explode2
+POSTHOOK: query: DROP TEMPORARY FUNCTION explode2
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: explode2
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Sat Jan 24 16:07:40 2015
@@ -53,4 +53,9 @@ public interface JobContext {
*/
Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
+ /**
+ * Return all added jar path which added through AddJarJob.
+ */
+ List<String> getAddedJars();
+
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Sat Jan 24 16:07:40 2015
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hive.spark.counter.SparkCounters;
@@ -32,11 +33,13 @@ class JobContextImpl implements JobConte
private final JavaSparkContext sc;
private final ThreadLocal<MonitorCallback> monitorCb;
private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
+ private final List<String> addedJars;
public JobContextImpl(JavaSparkContext sc) {
this.sc = sc;
this.monitorCb = new ThreadLocal<MonitorCallback>();
monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
+ addedJars = new CopyOnWriteArrayList<String>();
}
@@ -57,6 +60,11 @@ class JobContextImpl implements JobConte
return monitoredJobs;
}
+ @Override
+ public List<String> getAddedJars() {
+ return addedJars;
+ }
+
void setMonitorCb(MonitorCallback cb) {
monitorCb.set(cb);
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Sat Jan 24 16:07:40 2015
@@ -529,6 +529,9 @@ class SparkClientImpl implements SparkCl
@Override
public Serializable call(JobContext jc) throws Exception {
jc.sc().addJar(path);
+ // Following remote job may refer to classes in this jar, and the remote job would be executed
+ // in a different thread, so we add this jar path to JobContext for further usage.
+ jc.getAddedJars().add(path);
return null;
}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java?rev=1654553&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java Sat Jan 24 16:07:40 2015
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SparkClientUtilities {
+ protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class);
+
+ /**
+ * Add new elements to the classpath.
+ *
+ * @param newPaths Array of classpath elements
+ */
+ public static void addToClassPath(String[] newPaths) throws Exception {
+ ClassLoader cloader = Thread.currentThread().getContextClassLoader();
+ URLClassLoader loader = (URLClassLoader) cloader;
+ List<URL> curPath = Lists.newArrayList(loader.getURLs());
+
+ for (String newPath : newPaths) {
+ URL newUrl = urlFromPathString(newPath);
+ if (newUrl != null && !curPath.contains(newUrl)) {
+ curPath.add(newUrl);
+ LOG.info("Added jar[" + newUrl + "] to classpath.");
+ }
+ }
+
+ URLClassLoader newLoader = new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
+ Thread.currentThread().setContextClassLoader(newLoader);
+ }
+
+ /**
+ * Create a URL from a string representing a path to a local file.
+ * The path string can be just a path, or can start with file:/, file:///
+ * @param path path string
+ * @return
+ */
+ private static URL urlFromPathString(String path) {
+ URL url = null;
+ try {
+ if (StringUtils.indexOf(path, "file:/") == 0) {
+ url = new URL(path);
+ } else {
+ url = new File(path).toURL();
+ }
+ } catch (Exception err) {
+ LOG.error("Bad URL " + path + ", ignoring path");
+ }
+ return url;
+ }
+}