You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/15 15:33:05 UTC

svn commit: r1679562 - in /pig/branches/spark/src/org/apache/pig: backend/hadoop/executionengine/spark/SparkLauncher.java impl/PigContext.java

Author: xuefu
Date: Fri May 15 13:33:05 2015
New Revision: 1679562

URL: http://svn.apache.org/r1679562
Log:
PIG-4295: Enable unit test TestPigContext for spark (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/impl/PigContext.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1679562&r1=1679561&r2=1679562&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri May 15 13:33:05 2015
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 
 import java.io.File;
@@ -131,6 +132,7 @@ public class SparkLauncher extends Launc
 			PigContext pigContext) throws Exception {
 		if (LOG.isDebugEnabled())
 		    LOG.debug(physicalPlan);
+        saveUdfImportList(pigContext);
 		JobConf jobConf = SparkUtil.newJobConf(pigContext);
 		jobConf.set(PigConstants.LOCAL_CODE_DIR,
 				System.getProperty("java.io.tmpdir"));
@@ -638,4 +640,9 @@ public class SparkLauncher extends Launc
 		// TODO Auto-generated method stub
 
 	}
+
+    private void saveUdfImportList(PigContext pigContext) {
+        String udfImportList = Joiner.on(",").join(PigContext.getPackageImportList());
+        pigContext.getProperties().setProperty("udf.import.list", udfImportList);
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1679562&r1=1679561&r2=1679562&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Fri May 15 13:33:05 2015
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.impl;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -26,6 +29,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.lang.reflect.Constructor;
@@ -908,4 +913,24 @@ public class PigContext implements Seria
             classloader = new ContextClassLoader(cl);
         }
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        ArrayList<String> udfImportList = new ArrayList<String>();
+        if (packageImportList.get() == null) {
+            String udfImportListStr = properties.getProperty("udf.import.list");
+            if (udfImportListStr != null) {
+                udfImportList = Lists.newArrayList(Splitter.on(",").split(udfImportListStr));
+            }
+        } else {
+            udfImportList = packageImportList.get();
+        }
+        out.writeObject(udfImportList);
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        ArrayList<String> udfImportList = (ArrayList<String>) in.readObject();
+        packageImportList.set(udfImportList);
+    }
 }