You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/24 17:07:41 UTC

svn commit: r1654553 - in /hive/branches/spark: itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/ ql/src/test...

Author: xuefu
Date: Sat Jan 24 16:07:40 2015
New Revision: 1654553

URL: http://svn.apache.org/r1654553
Log:
HIVE-9410: ClassNotFoundException occurs during hive query case execution with UDF defined [Spark Branch] (Chengxiang via Xuefu)

Added:
    hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q
    hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
Modified:
    hive/branches/spark/itests/src/test/resources/testconfiguration.properties
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java

Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Sat Jan 24 16:07:40 2015
@@ -996,4 +996,5 @@ spark.query.files=add_part_multiple.q, \
   vectorized_shufflejoin.q, \
   vectorized_string_funcs.q, \
   vectorized_timestamp_funcs.q, \
-  windowing.q
+  windowing.q, \
+  lateral_view_explode2.q

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Jan 24 16:07:40 2015
@@ -207,6 +207,7 @@ public final class Utilities {
   public static final String INPUT_NAME = "iocontext.input.name";
   public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
   public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
+  public static final String HIVE_ADDED_JARS = "hive.added.jars";
 
   /**
    * ReduceField:
@@ -364,6 +365,18 @@ public final class Utilities {
     Path path = null;
     InputStream in = null;
     try {
+      String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE);
+      if (engine.equals("spark")) {
+        // TODO Add jar into current thread context classloader as it may be invoked by Spark driver inside
+        // threads, should be unnecessary while SPARK-5377 is resolved.
+        String addedJars = conf.get(HIVE_ADDED_JARS);
+        if (addedJars != null && !addedJars.isEmpty()) {
+          ClassLoader loader = Thread.currentThread().getContextClassLoader();
+          ClassLoader newLoader = addToClassPath(loader, addedJars.split(";"));
+          Thread.currentThread().setContextClassLoader(newLoader);
+        }
+      }
+
       path = getPlanPath(conf, name);
       LOG.info("PLAN PATH = " + path);
       assert path != null;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Sat Jan 24 16:07:40 2015
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.MalformedURLException;
@@ -26,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,15 +52,13 @@ import org.apache.hive.spark.client.JobC
 import org.apache.hive.spark.client.JobHandle;
 import org.apache.hive.spark.client.SparkClient;
 import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.hive.spark.client.SparkClientUtilities;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-
 /**
  * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
  * wrap a spark job request and send to an remote SparkContext.
@@ -208,6 +210,15 @@ public class RemoteHiveSparkClient imple
     @Override
     public Serializable call(JobContext jc) throws Exception {
       JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes);
+
+      // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
+      // may need to load classes from this jar in other threads.
+      List<String> addedJars = jc.getAddedJars();
+      if (addedJars != null && !addedJars.isEmpty()) {
+        SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]));
+        localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
+      }
+
       Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
       SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class);
 
@@ -234,7 +245,6 @@ public class RemoteHiveSparkClient imple
       jc.monitor(future, sparkCounters, plan.getCachedRDDIds());
       return null;
     }
-
   }
 
 }

Added: hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q?rev=1654553&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q (added)
+++ hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q Sat Jan 24 16:07:40 2015
@@ -0,0 +1,9 @@
+add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar;
+
+CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2';
+
+EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3;
+
+SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3;
+
+DROP TEMPORARY FUNCTION explode2;
\ No newline at end of file

Added: hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out?rev=1654553&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out Sat Jan 24 16:07:40 2015
@@ -0,0 +1,102 @@
+PREHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: explode2
+POSTHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: explode2
+PREHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: src
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
+            Lateral View Forward
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
+              Select Operator
+                Statistics: Num rows: 500 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+                Lateral View Join Operator
+                  outputColumnNames: _col5, _col6
+                  Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    keys: _col5 (type: int), _col6 (type: int)
+                    mode: hash
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int), _col1 (type: int)
+                      sort order: ++
+                      Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+                      Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+              Select Operator
+                expressions: array(1,2,3) (type: array<int>)
+                outputColumnNames: _col0
+                Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+                UDTF Operator
+                  Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+                  function name: explode
+                  Lateral View Join Operator
+                    outputColumnNames: _col5, _col6
+                    Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      keys: _col5 (type: int), _col6 (type: int)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int), _col1 (type: int)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+                        Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+      Reduce Operator Tree:
+        Group By Operator
+          keys: KEY._col0 (type: int), KEY._col1 (type: int)
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+          Select Operator
+            expressions: _col0 (type: int), _col1 (type: int)
+            outputColumnNames: _col0, _col1
+            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+            Limit
+              Number of rows: 3
+              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+              File Output Operator
+                compressed: false
+                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 3
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+1	1
+2	2
+3	3
+PREHOOK: query: DROP TEMPORARY FUNCTION explode2
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: explode2
+POSTHOOK: query: DROP TEMPORARY FUNCTION explode2
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: explode2

Added: hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out?rev=1654553&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out Sat Jan 24 16:07:40 2015
@@ -0,0 +1,108 @@
+PREHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: explode2
+POSTHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: explode2
+PREHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (GROUP, 2)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
+                  Lateral View Forward
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      Statistics: Num rows: 500 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+                      Lateral View Join Operator
+                        outputColumnNames: _col5, _col6
+                        Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+                        Group By Operator
+                          keys: _col5 (type: int), _col6 (type: int)
+                          mode: hash
+                          outputColumnNames: _col0, _col1
+                          Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+                          Reduce Output Operator
+                            key expressions: _col0 (type: int), _col1 (type: int)
+                            sort order: ++
+                            Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+                            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+                    Select Operator
+                      expressions: array(1,2,3) (type: array<int>)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+                      UDTF Operator
+                        Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+                        function name: explode
+                        Lateral View Join Operator
+                          outputColumnNames: _col5, _col6
+                          Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE
+                          Group By Operator
+                            keys: _col5 (type: int), _col6 (type: int)
+                            mode: hash
+                            outputColumnNames: _col0, _col1
+                            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+                            Reduce Output Operator
+                              key expressions: _col0 (type: int), _col1 (type: int)
+                              sort order: ++
+                              Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+                              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: int), KEY._col1 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col0 (type: int), _col1 (type: int)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+                  Limit
+                    Number of rows: 3
+                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 3
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+2	2
+1	1
+3	3
+PREHOOK: query: DROP TEMPORARY FUNCTION explode2
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: explode2
+POSTHOOK: query: DROP TEMPORARY FUNCTION explode2
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: explode2

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Sat Jan 24 16:07:40 2015
@@ -53,4 +53,9 @@ public interface JobContext {
    */
   Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
 
+  /**
+   * Return all added jar path which added through AddJarJob.
+   */
+  List<String> getAddedJars();
+
 }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Sat Jan 24 16:07:40 2015
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.hive.spark.counter.SparkCounters;
 
@@ -32,11 +33,13 @@ class JobContextImpl implements JobConte
   private final JavaSparkContext sc;
   private final ThreadLocal<MonitorCallback> monitorCb;
   private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
+  private final List<String> addedJars;
 
   public JobContextImpl(JavaSparkContext sc) {
     this.sc = sc;
     this.monitorCb = new ThreadLocal<MonitorCallback>();
     monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
+    addedJars = new CopyOnWriteArrayList<String>();
   }
 
 
@@ -57,6 +60,11 @@ class JobContextImpl implements JobConte
     return monitoredJobs;
   }
 
+  @Override
+  public List<String> getAddedJars() {
+    return addedJars;
+  }
+
   void setMonitorCb(MonitorCallback cb) {
     monitorCb.set(cb);
   }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1654553&r1=1654552&r2=1654553&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Sat Jan 24 16:07:40 2015
@@ -529,6 +529,9 @@ class SparkClientImpl implements SparkCl
     @Override
     public Serializable call(JobContext jc) throws Exception {
       jc.sc().addJar(path);
+      // Following remote job may refer to classes in this jar, and the remote job would be executed
+      // in a different thread, so we add this jar path to JobContext for further usage.
+      jc.getAddedJars().add(path);
       return null;
     }
 

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java?rev=1654553&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java Sat Jan 24 16:07:40 2015
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SparkClientUtilities {
+  protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class);
+
+  /**
+   * Add new elements to the classpath.
+   *
+   * @param newPaths Array of classpath elements
+   */
+  public static void addToClassPath(String[] newPaths) throws Exception {
+    ClassLoader cloader = Thread.currentThread().getContextClassLoader();
+    URLClassLoader loader = (URLClassLoader) cloader;
+    List<URL> curPath = Lists.newArrayList(loader.getURLs());
+
+    for (String newPath : newPaths) {
+      URL newUrl = urlFromPathString(newPath);
+      if (newUrl != null && !curPath.contains(newUrl)) {
+        curPath.add(newUrl);
+        LOG.info("Added jar[" + newUrl + "] to classpath.");
+      }
+    }
+
+    URLClassLoader newLoader = new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
+    Thread.currentThread().setContextClassLoader(newLoader);
+  }
+
+  /**
+   * Create a URL from a string representing a path to a local file.
+   * The path string can be just a path, or can start with file:/, file:///
+   * @param path  path string
+   * @return
+   */
+  private static URL urlFromPathString(String path) {
+    URL url = null;
+    try {
+      if (StringUtils.indexOf(path, "file:/") == 0) {
+        url = new URL(path);
+      } else {
+        url = new File(path).toURL();
+      }
+    } catch (Exception err) {
+      LOG.error("Bad URL " + path + ", ignoring path");
+    }
+    return url;
+  }
+}