You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/15 15:33:05 UTC
svn commit: r1679562 - in /pig/branches/spark/src/org/apache/pig:
backend/hadoop/executionengine/spark/SparkLauncher.java impl/PigContext.java
Author: xuefu
Date: Fri May 15 13:33:05 2015
New Revision: 1679562
URL: http://svn.apache.org/r1679562
Log:
PIG-4295: Enable unit test TestPigContext for spark (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/impl/PigContext.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1679562&r1=1679561&r2=1679562&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri May 15 13:33:05 2015
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.io.File;
@@ -131,6 +132,7 @@ public class SparkLauncher extends Launc
PigContext pigContext) throws Exception {
if (LOG.isDebugEnabled())
LOG.debug(physicalPlan);
+ saveUdfImportList(pigContext);
JobConf jobConf = SparkUtil.newJobConf(pigContext);
jobConf.set(PigConstants.LOCAL_CODE_DIR,
System.getProperty("java.io.tmpdir"));
@@ -638,4 +640,9 @@ public class SparkLauncher extends Launc
// TODO Auto-generated method stub
}
+
+ private void saveUdfImportList(PigContext pigContext) {
+ String udfImportList = Joiner.on(",").join(PigContext.getPackageImportList());
+ pigContext.getProperties().setProperty("udf.import.list", udfImportList);
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1679562&r1=1679561&r2=1679562&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Fri May 15 13:33:05 2015
@@ -17,6 +17,9 @@
*/
package org.apache.pig.impl;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -26,6 +29,8 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
@@ -908,4 +913,24 @@ public class PigContext implements Seria
classloader = new ContextClassLoader(cl);
}
}
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ ArrayList<String> udfImportList = new ArrayList<String>();
+ if (packageImportList.get() == null) {
+ String udfImportListStr = properties.getProperty("udf.import.list");
+ if (udfImportListStr != null) {
+ udfImportList = Lists.newArrayList(Splitter.on(",").split(udfImportListStr));
+ }
+ } else {
+ udfImportList = packageImportList.get();
+ }
+ out.writeObject(udfImportList);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ ArrayList<String> udfImportList = (ArrayList<String>) in.readObject();
+ packageImportList.set(udfImportList);
+ }
}