You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/11 00:28:05 UTC
svn commit: r1566847 - in /pig/branches/tez/src/org/apache/pig:
backend/hadoop/executionengine/tez/TezPlanContainer.java
impl/util/JarManager.java scripting/ScriptEngine.java
Author: daijy
Date: Mon Feb 10 23:28:05 2014
New Revision: 1566847
URL: http://svn.apache.org/r1566847
Log:
PIG-3756: Add support for non-Java UDF's
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1566847&r1=1566846&r2=1566847&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Mon Feb 10 23:28:05 2014
@@ -45,22 +45,38 @@ public class TezPlanContainer extends Op
this.pigContext = pigContext;
}
- // Use pig.jar and udf jars for the AM resources (all DAG in the planContainer will
- // use it for simplicity)
+ // Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
+ // will use them for simplicity). This differs from MR Pig, where they are added to
+ // the job jar.
public Map<String, LocalResource> getLocalResources() throws Exception {
Set<URL> jarLists = new HashSet<URL>();
jarLists.add(TezResourceManager.getBootStrapJar());
+ // In MR Pig the extra jars and script jars get put in Distributed Cache, but
+ // in Tez we'll add them as local resources.
for (java.net.URL jarUrl : pigContext.extraJars) {
jarLists.add(ConverterUtils.getYarnUrlFromURI(jarUrl.toURI()));
}
+ for (String jarFile : pigContext.scriptJars) {
+ jarLists.add(ConverterUtils.getYarnUrlFromURI(new File(jarFile).toURI()));
+ }
+
+ // Script files for non-Java UDF's are added to the Job.jar by the JarManager class,
+ // except for Groovy files, which need to be explicitly added as local resources due
+ // to the GroovyScriptEngine (see JarManager.java for comments).
+ for (Map.Entry<String, File> scriptFile : pigContext.getScriptFiles().entrySet()) {
+ if (scriptFile.getKey().endsWith(".groovy")) {
+ jarLists.add(ConverterUtils.getYarnUrlFromURI(scriptFile.getValue().toURI()));
+ }
+ }
+
TezPlanContainerUDFCollector tezPlanContainerUDFCollector = new TezPlanContainerUDFCollector(this);
tezPlanContainerUDFCollector.visit();
Set<String> udfs = tezPlanContainerUDFCollector.getUdfs();
- for (String func: udfs) {
+ for (String func : udfs) {
Class clazz = pigContext.getClassForAlias(func);
if (clazz != null) {
String jarName = JarManager.findContainingJar(clazz);
@@ -71,8 +87,17 @@ public class TezPlanContainer extends Op
// avoid NPE.
continue;
}
+
URL jarUrl = ConverterUtils.getYarnUrlFromURI(new File(jarName).toURI());
jarLists.add(jarUrl);
+
+ // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
+ // resources for them yet.
+ // if ("StreamingUDF".equals(clazz.getSimpleName())) {
+ // for (String fileName : StreamingUDF.getResourcesForJar()) {
+ // jarLists.add(ConverterUtils.getYarnUrlFromURI(new File(fileName).toURI()));
+ // }
+ // }
}
}
Modified: pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java?rev=1566847&r1=1566846&r2=1566847&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java Mon Feb 10 23:28:05 2014
@@ -137,12 +137,32 @@ public class JarManager {
for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
addContainingJar(jarList, pkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
}
-
+
Iterator<JarListEntry> it = jarList.iterator();
while (it.hasNext()) {
JarListEntry jarEntry = it.next();
mergeJar(jarFile, jarEntry.jar, jarEntry.prefix, contents);
}
+
+ // Just like in MR Pig, we'll add the script files to Job.jar. For Jython, MR Pig packages
+ // all the dependencies in Job.jar. For JRuby, we need the resource pigudf.rb, which is in
+ // the pig jar. JavaScript files could be packaged either in the Job.jar or as Tez local
+ // resources; MR Pig adds them to the Job.jar so that's what we will do also. Groovy files
+ // must be added as Tez local resources in the TezPlanContainer (in MR Pig Groovy UDF's
+ // are actually broken since they cannot be found by the GroovyScriptEngine).
+ for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
+ InputStream stream = null;
+ if (entry.getValue().exists()) {
+ stream = new FileInputStream(entry.getValue());
+ } else {
+ stream = PigContext.getClassLoader().getResourceAsStream(entry.getValue().getPath());
+ }
+ if (stream == null) {
+ throw new IOException("Cannot find " + entry.getValue().getPath());
+ }
+ addStream(jarFile, entry.getKey(), stream, contents);
+ }
+
jarFile.close();
}
Modified: pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java?rev=1566847&r1=1566846&r2=1566847&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java Mon Feb 10 23:28:05 2014
@@ -47,14 +47,12 @@ public abstract class ScriptEngine {
public static enum SupportedScriptLang {
- // possibly jruby in the future
jruby(new String[]{"ruby", "jruby"}, new String[]{"rb"}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"),
javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine"),
groovy(new String[]{}, new String[]{"groovy"}, "org.apache.pig.scripting.groovy.GroovyScriptEngine"),
streaming_python(new String[]{"streaming_python"}, new String[]{}, "org.apache.pig.scripting.streaming.python.PythonScriptEngine");
-
private static Set<String> supportedScriptLangs;
static {
supportedScriptLangs = new HashSet<String>();