You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/02/16 21:36:02 UTC
svn commit: r1245165 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/impl/ src/org/apache/pig/impl/io/
src/org/apache/pig/impl/util/ test/org/apache/pig/test/
Author: dvryaboy
Date: Thu Feb 16 20:36:02 2012
New Revision: 1245165
URL: http://svn.apache.org/viewvc?rev=1245165&view=rev
Log:
PIG-2010: registered jars on distributed cache
Added:
pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/impl/PigContext.java
pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
pig/trunk/src/org/apache/pig/impl/util/JarManager.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Feb 16 20:36:02 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2010: registered jars on distributed cache (traviscrawford and julienledem via dvryaboy)
+
PIG-2533: Pig MR job exceptions masked on frontend (traviscrawford via dvryaboy)
PIG-2525: Support pluggable PigProcessNotifcationListeners on the command line (dvryaboy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Feb 16 20:36:02 2012
@@ -20,8 +20,10 @@ package org.apache.pig.backend.hadoop.ex
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,6 +39,7 @@ import org.apache.hadoop.filecache.Distr
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
@@ -55,9 +58,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
@@ -404,6 +407,13 @@ public class JobControlCompiler{
if (!pigContext.inIllustrator && pigContext.getExecType() != ExecType.LOCAL)
{
+
+ // Setup the DistributedCache for this job
+ for (URL extraJar : pigContext.extraJars) {
+ log.debug("Adding jar to DistributedCache: " + extraJar.toString());
+ putJarOnClassPathThroughDistributedCache(pigContext, conf, extraJar);
+ }
+
//Create the jar of all functions and classes required
File submitJarFile = File.createTempFile("Job", ".jar");
log.info("creating jar file "+submitJarFile.getName());
@@ -448,7 +458,6 @@ public class JobControlCompiler{
}
}
- // Setup the DistributedCache for this job
setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(),
"pig.streaming.ship.files", true);
setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(),
@@ -1158,28 +1167,20 @@ public class JobControlCompiler{
setupDistributedCache(pigContext, conf, paths, shipToCluster);
}
}
-
+
private static void setupDistributedCache(PigContext pigContext,
Configuration conf, String[] paths, boolean shipToCluster) throws IOException {
// Turn on the symlink feature
DistributedCache.createSymlink(conf);
-
+
for (String path : paths) {
path = path.trim();
if (path.length() != 0) {
Path src = new Path(path);
-
+
// Ensure that 'src' is a valid URI
- URI srcURI = null;
- try {
- srcURI = new URI(src.toString());
- } catch (URISyntaxException ue) {
- int errCode = 6003;
- String msg = "Invalid cache specification. " +
- "File doesn't exist: " + src;
- throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
- }
-
+ URI srcURI = toURI(src);
+
// Ship it to the cluster if necessary and add to the
// DistributedCache
if (shipToCluster) {
@@ -1187,7 +1188,7 @@ public class JobControlCompiler{
new Path(FileLocalizer.getTemporaryPath(pigContext).toString());
FileSystem fs = dst.getFileSystem(conf);
fs.copyFromLocalFile(src, dst);
-
+
// Construct the dst#srcName uri for DistributedCache
URI dstURI = null;
try {
@@ -1244,74 +1245,146 @@ public class JobControlCompiler{
return symlink;
}
+
+ /**
+ * Ensure that 'src' is a valid URI
+ * @param src the source Path
+ * @return a URI for this path
+ * @throws ExecException
+ */
+ private static URI toURI(Path src) throws ExecException {
+ try {
+ return new URI(src.toString());
+ } catch (URISyntaxException ue) {
+ int errCode = 6003;
+ String msg = "Invalid cache specification. " +
+ "File doesn't exist: " + src;
+ throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
+ }
+ }
+
+ /**
+ * if url is not in HDFS will copy the path to HDFS from local before adding to distributed cache
+ * @param pigContext the pigContext
+ * @param conf the job conf
+ * @param url the url to be added to distributed cache
+ * @return the path as seen on distributed cache
+ * @throws IOException
+ */
+ private static void putJarOnClassPathThroughDistributedCache(
+ PigContext pigContext,
+ Configuration conf,
+ URL url) throws IOException {
+
+ // Turn on the symlink feature
+ DistributedCache.createSymlink(conf);
+
+ // REGISTER always copies locally the jar file. see PigServer.registerJar()
+ Path pathInHDFS = shipToHDFS(pigContext, conf, url);
+ // and add to the DistributedCache
+ DistributedCache.addFileToClassPath(pathInHDFS, conf);
+ pigContext.skipJars.add(url.getPath());
+ }
+
+ /**
+ * copy the file to hdfs in a temporary path
+ * @param pigContext the pig context
+ * @param conf the job conf
+ * @param url the url to ship to hdfs
+ * @return the location where it was shipped
+ * @throws IOException
+ */
+ private static Path shipToHDFS(
+ PigContext pigContext,
+ Configuration conf,
+ URL url) throws IOException {
+
+ String path = url.getPath();
+ int slash = path.lastIndexOf("/");
+ String suffix = slash == -1 ? path : path.substring(slash+1);
+
+ Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);
+ FileSystem fs = dst.getFileSystem(conf);
+ OutputStream os = fs.create(dst);
+ try {
+ IOUtils.copyBytes(url.openStream(), os, 4096, true);
+ } finally {
+ // IOUtils can not close both the input and the output properly in a finally
+ // as we can get an exception in between opening the stream and calling the method
+ os.close();
+ }
+ return dst;
+ }
+
+
private static class JoinDistributedCacheVisitor extends PhyPlanVisitor {
-
+
private PigContext pigContext = null;
-
- private Configuration conf = null;
-
- public JoinDistributedCacheVisitor(PhysicalPlan plan,
- PigContext pigContext, Configuration conf) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
- plan));
- this.pigContext = pigContext;
- this.conf = conf;
- }
-
- @Override
+
+ private Configuration conf = null;
+
+ public JoinDistributedCacheVisitor(PhysicalPlan plan,
+ PigContext pigContext, Configuration conf) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+ plan));
+ this.pigContext = pigContext;
+ this.conf = conf;
+ }
+
+ @Override
public void visitFRJoin(POFRJoin join) throws VisitorException {
-
- // XXX Hadoop currently doesn't support distributed cache in local mode.
- // This line will be removed after the support is added
- if (pigContext.getExecType() == ExecType.LOCAL) return;
-
- // set up distributed cache for the replicated files
- FileSpec[] replFiles = join.getReplFiles();
- ArrayList<String> replicatedPath = new ArrayList<String>();
-
- FileSpec[] newReplFiles = new FileSpec[replFiles.length];
-
- // the first input is not replicated
- for (int i = 0; i < replFiles.length; i++) {
- // ignore fragmented file
- String symlink = "";
- if (i != join.getFragment()) {
- symlink = "pigrepl_" + join.getOperatorKey().toString() + "_"
- + Integer.toString(System.identityHashCode(replFiles[i].getFileName()))
- + "_" + Long.toString(System.currentTimeMillis())
- + "_" + i;
- replicatedPath.add(replFiles[i].getFileName() + "#"
- + symlink);
- }
- newReplFiles[i] = new FileSpec(symlink,
- (replFiles[i] == null ? null : replFiles[i].getFuncSpec()));
- }
-
- join.setReplFiles(newReplFiles);
-
- try {
- setupDistributedCache(pigContext, conf, replicatedPath
- .toArray(new String[0]), false);
- } catch (IOException e) {
- String msg = "Internal error. Distributed cache could not " +
- "be set up for the replicated files";
- throw new VisitorException(msg, e);
- }
- }
-
- @Override
- public void visitMergeJoin(POMergeJoin join) throws VisitorException {
-
- // XXX Hadoop currently doesn't support distributed cache in local mode.
- // This line will be removed after the support is added
- if (pigContext.getExecType() == ExecType.LOCAL) return;
-
- String indexFile = join.getIndexFile();
-
- // merge join may not use an index file
- if (indexFile == null) return;
-
- try {
+
+ // XXX Hadoop currently doesn't support distributed cache in local mode.
+ // This line will be removed after the support is added
+ if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+ // set up distributed cache for the replicated files
+ FileSpec[] replFiles = join.getReplFiles();
+ ArrayList<String> replicatedPath = new ArrayList<String>();
+
+ FileSpec[] newReplFiles = new FileSpec[replFiles.length];
+
+ // the first input is not replicated
+ for (int i = 0; i < replFiles.length; i++) {
+ // ignore fragmented file
+ String symlink = "";
+ if (i != join.getFragment()) {
+ symlink = "pigrepl_" + join.getOperatorKey().toString() + "_"
+ + Integer.toString(System.identityHashCode(replFiles[i].getFileName()))
+ + "_" + Long.toString(System.currentTimeMillis())
+ + "_" + i;
+ replicatedPath.add(replFiles[i].getFileName() + "#"
+ + symlink);
+ }
+ newReplFiles[i] = new FileSpec(symlink,
+ (replFiles[i] == null ? null : replFiles[i].getFuncSpec()));
+ }
+
+ join.setReplFiles(newReplFiles);
+
+ try {
+ setupDistributedCache(pigContext, conf, replicatedPath
+ .toArray(new String[0]), false);
+ } catch (IOException e) {
+ String msg = "Internal error. Distributed cache could not " +
+ "be set up for the replicated files";
+ throw new VisitorException(msg, e);
+ }
+ }
+
+ @Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+
+ // XXX Hadoop currently doesn't support distributed cache in local mode.
+ // This line will be removed after the support is added
+ if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+ String indexFile = join.getIndexFile();
+
+ // merge join may not use an index file
+ if (indexFile == null) return;
+
+ try {
String symlink = addSingleFileToDistributedCache(pigContext,
conf, indexFile, "indexfile_");
join.setIndexFile(symlink);
@@ -1320,21 +1393,21 @@ public class JobControlCompiler{
"be set up for merge join index file";
throw new VisitorException(msg, e);
}
- }
-
- @Override
+ }
+
+ @Override
public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
throws VisitorException {
-
- // XXX Hadoop currently doesn't support distributed cache in local mode.
- // This line will be removed after the support is added
- if (pigContext.getExecType() == ExecType.LOCAL) return;
-
- String indexFile = mergeCoGrp.getIndexFileName();
-
- if (indexFile == null) throw new VisitorException("No index file");
-
- try {
+
+ // XXX Hadoop currently doesn't support distributed cache in local mode.
+ // This line will be removed after the support is added
+ if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+ String indexFile = mergeCoGrp.getIndexFileName();
+
+ if (indexFile == null) throw new VisitorException("No index file");
+
+ try {
String symlink = addSingleFileToDistributedCache(pigContext,
conf, indexFile, "indexfile_mergecogrp_");
mergeCoGrp.setIndexFileName(symlink);
@@ -1344,40 +1417,40 @@ public class JobControlCompiler{
throw new VisitorException(msg, e);
}
}
- }
+ }
- private static class UdfDistributedCacheVisitor extends PhyPlanVisitor {
-
- private PigContext pigContext = null;
- private Configuration conf = null;
-
- public UdfDistributedCacheVisitor(PhysicalPlan plan,
- PigContext pigContext,
- Configuration conf) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
- plan));
- this.pigContext = pigContext;
- this.conf = conf;
- }
-
- @Override
- public void visitUserFunc(POUserFunc func) throws VisitorException {
-
- // XXX Hadoop currently doesn't support distributed cache in local mode.
- // This line will be removed after the support is added
- if (pigContext.getExecType() == ExecType.LOCAL) return;
-
- // set up distributed cache for files indicated by the UDF
- String[] files = func.getCacheFiles();
- if (files == null) return;
-
- try {
- setupDistributedCache(pigContext, conf, files, false);
- } catch (IOException e) {
+ private static class UdfDistributedCacheVisitor extends PhyPlanVisitor {
+
+ private PigContext pigContext = null;
+ private Configuration conf = null;
+
+ public UdfDistributedCacheVisitor(PhysicalPlan plan,
+ PigContext pigContext,
+ Configuration conf) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+ plan));
+ this.pigContext = pigContext;
+ this.conf = conf;
+ }
+
+ @Override
+ public void visitUserFunc(POUserFunc func) throws VisitorException {
+
+ // XXX Hadoop currently doesn't support distributed cache in local mode.
+ // This line will be removed after the support is added
+ if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+ // set up distributed cache for files indicated by the UDF
+ String[] files = func.getCacheFiles();
+ if (files == null) return;
+
+ try {
+ setupDistributedCache(pigContext, conf, files, false);
+ } catch (IOException e) {
String msg = "Internal error. Distributed cache could not " +
- "be set up for the requested files";
+ "be set up for the requested files";
throw new VisitorException(msg, e);
- }
- }
- }
+ }
+ }
+ }
}
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Thu Feb 16 20:36:02 2012
@@ -58,7 +58,7 @@ import org.apache.pig.impl.util.JarManag
public class PigContext implements Serializable {
private static final long serialVersionUID = 1L;
- private transient final Log log = LogFactory.getLog(getClass());
+ private static final Log log = LogFactory.getLog(PigContext.class);
public static final String JOB_NAME = "jobName";
public static final String JOB_NAME_PREFIX= "PigLatin";
Modified: pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu Feb 16 20:36:02 2012
@@ -515,17 +515,21 @@ public class FileLocalizer {
}
public static Path getTemporaryPath(PigContext pigContext) throws IOException {
- ElementDescriptor relative = relativeRoot(pigContext);
-
- if (!relativeRoot(pigContext).exists()) {
- relativeRoot(pigContext).create();
- }
- ElementDescriptor elem=
- pigContext.getDfs().asElement(relative.toString(), "tmp" + r.nextInt());
- toDelete().push(elem);
- return ((HPath)elem).getPath();
+ return getTemporaryPath(pigContext, "");
}
-
+
+ public static Path getTemporaryPath(PigContext pigContext, String suffix) throws IOException {
+ ElementDescriptor relative = relativeRoot(pigContext);
+
+ if (!relativeRoot(pigContext).exists()) {
+ relativeRoot(pigContext).create();
+ }
+ ElementDescriptor elem=
+ pigContext.getDfs().asElement(relative.toString(), "tmp" + r.nextInt() + suffix);
+ toDelete().push(elem);
+ return ((HPath)elem).getPath();
+ }
+
public static String hadoopify(String filename, PigContext pigContext) throws IOException {
if (filename.startsWith(LOCAL_PREFIX)) {
filename = filename.substring(LOCAL_PREFIX.length());
Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Thu Feb 16 20:36:02 2012
@@ -121,17 +121,16 @@ public class JarManager {
for (String scriptJar: pigContext.scriptJars) {
mergeJar(jarFile, scriptJar, null, contents);
}
- for (URL extraJar: pigContext.extraJars) {
- // log.error("Adding extra " + pigContext.extraJars.get(i));
- mergeJar(jarFile, extraJar, null, contents);
- }
for (String path: pigContext.scriptFiles) {
+ log.debug("Adding entry " + path + " to job jar" );
addStream(jarFile, path, new FileInputStream(new File(path)),contents);
}
for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
+ log.debug("Adding entry " + entry.getKey() + " to job jar" );
addStream(jarFile, entry.getKey(), new FileInputStream(entry.getValue()),contents);
}
-
+
+ log.debug("Adding entry pigContext to job jar" );
jarFile.putNextEntry(new ZipEntry("pigContext"));
new ObjectOutputStream(jarFile).writeObject(pigContext);
jarFile.close();
@@ -177,7 +176,7 @@ public class JarManager {
private static void mergeJar(JarOutputStream jarFile, String jar, String prefix, Map<String, String> contents)
throws FileNotFoundException, IOException {
JarInputStream jarInput = new JarInputStream(new FileInputStream(jar));
-
+ log.debug("Adding jar " + jar + (prefix != null ? " for prefix "+prefix : "" ) + " to job jar" );
mergeJar(jarFile, jarInput, prefix, contents);
}
Added: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1245165&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Thu Feb 16 20:36:02 2012
@@ -0,0 +1,191 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import javax.tools.JavaCompiler;
+import javax.tools.JavaCompiler.CompilationTask;
+import javax.tools.JavaFileObject;
+import javax.tools.StandardJavaFileManager;
+import javax.tools.ToolProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobControlCompiler {
+
+ /**
+ * specifically tests that REGISTERED jars get added to distributed cache instead of merged into
+ * the job jar
+ * @throws Exception
+ */
+ @Test
+ public void testJarAddedToDistributedCache() throws Exception {
+
+ // creating a jar with a UDF *not* in the current classloader
+ File tmpFile = File.createTempFile("Some_", ".jar");
+ tmpFile.deleteOnExit();
+ String className = createTestJar(tmpFile);
+ final String testUDFFileName = className+".class";
+
+ // creating a hadoop-site.xml and making it visible to Pig
+ // making sure it is at the same location as for other tests to not pick up a
+ // conf from a previous test
+ File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/");
+ conf_dir.mkdirs();
+ File hadoopSite = new File(conf_dir, "hadoop-site.xml");
+ hadoopSite.deleteOnExit();
+ FileWriter fw = new FileWriter(hadoopSite);
+ try {
+ fw.write("<?xml version=\"1.0\"?>\n");
+ fw.write("<?xml-stylesheet type=\"text/xsl\" href=\"nutch-conf.xsl\"?>\n");
+ fw.write("<configuration>\n");
+ fw.write("</configuration>\n");
+ } finally {
+ fw.close();
+ }
+ // making hadoop-site.xml visible to Pig as it REQUIRES!!! one when running in mapred mode
+ Thread.currentThread().setContextClassLoader(
+ new URLClassLoader(new URL[] {conf_dir.toURI().toURL()}));
+
+ // JobControlCompiler setup
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
+ pigContext.connect();
+ pigContext.addJar(tmpFile.getAbsolutePath());
+ Configuration conf = new Configuration();
+ JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, conf);
+ MROperPlan plan = new MROperPlan();
+ MapReduceOper mro = new MapReduceOper(new OperatorKey());
+ mro.UDFs = new HashSet<String>();
+ mro.UDFs.add(className+"()");
+ plan.add(mro);
+
+ // compiling the job
+ JobControl jobControl = jobControlCompiler.compile(plan , "test");
+ JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
+
+ // verifying the jar gets on distributed cache
+ Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
+ Assert.assertEquals("size 1 for "+Arrays.toString(fileClassPaths), 1, fileClassPaths.length);
+ Path distributedCachePath = fileClassPaths[0];
+ Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
+ // hadoop bug requires path to not contain hdfs://hotname in front
+ Assert.assertTrue("starts with /: "+distributedCachePath,
+ distributedCachePath.toString().startsWith("/"));
+ Assert.assertTrue("jar pushed to distributed cache should contain testUDF",
+ jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), testUDFFileName));
+
+ // verifying the job jar does not contain the UDF
+// jobConf.writeXml(System.out);
+ File submitJarFile = new File(jobConf.get("mapred.jar"));
+ Assert.assertFalse("the mapred.jar should *not* contain the testUDF", jarContainsFileNamed(submitJarFile, testUDFFileName));
+
+ }
+
+ /**
+ * checks if the given file name is in the jar
+ * @param jarFile the jar to check
+ * @param name the name to find (full path in the jar)
+ * @return true if the name was found
+ * @throws IOException
+ */
+ private boolean jarContainsFileNamed(File jarFile, String name) throws IOException {
+ Enumeration<JarEntry> entries = new JarFile(jarFile).entries();
+ while (entries.hasMoreElements()) {
+ JarEntry entry = entries.nextElement();
+ if (entry.getName().equals(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * creates a jar containing a UDF not in the current classloader
+ * @param jarFile the jar to create
+ * @return the name of the class created (in the default package)
+ * @throws IOException
+ * @throws FileNotFoundException
+ */
+ private String createTestJar(File jarFile) throws IOException, FileNotFoundException {
+
+ // creating the source .java file
+ File javaFile = File.createTempFile("TestUDF", ".java");
+ javaFile.deleteOnExit();
+ String className = javaFile.getName().substring(0, javaFile.getName().lastIndexOf('.'));
+ FileWriter fw = new FileWriter(javaFile);
+ try {
+ fw.write("import org.apache.pig.EvalFunc;\n");
+ fw.write("import org.apache.pig.data.Tuple;\n");
+ fw.write("import java.io.IOException;\n");
+ fw.write("public class "+className+" extends EvalFunc<String> {\n");
+ fw.write(" public String exec(Tuple input) throws IOException {\n");
+ fw.write(" return \"test\";\n");
+ fw.write(" }\n");
+ fw.write("}\n");
+ } finally {
+ fw.close();
+ }
+
+ // compiling it
+ JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
+ Iterable<? extends JavaFileObject> compilationUnits1 = fileManager.getJavaFileObjects(javaFile);
+ CompilationTask task = compiler.getTask(null, fileManager, null, null, null, compilationUnits1);
+ task.call();
+
+ // here is the compiled file
+ File classFile = new File(javaFile.getParentFile(), className+".class");
+ Assert.assertTrue(classFile.exists());
+
+ // putting it in the jar
+ JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarFile));
+ try {
+ jos.putNextEntry(new ZipEntry(classFile.getName()));
+ try {
+ InputStream testClassContentIS = new FileInputStream(classFile);
+ try {
+ byte[] buffer = new byte[64000];
+ int n;
+ while ((n = testClassContentIS.read(buffer)) != -1) {
+ jos.write(buffer, 0, n);
+ }
+ } finally {
+ testClassContentIS.close();
+ }
+ }finally {
+ jos.closeEntry();
+ }
+ } finally {
+ jos.close();
+ }
+
+ return className;
+ }
+}