You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/11 00:28:05 UTC

svn commit: r1566847 - in /pig/branches/tez/src/org/apache/pig: backend/hadoop/executionengine/tez/TezPlanContainer.java impl/util/JarManager.java scripting/ScriptEngine.java

Author: daijy
Date: Mon Feb 10 23:28:05 2014
New Revision: 1566847

URL: http://svn.apache.org/r1566847
Log:
PIG-3756: Add support for non-Java UDF's

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
    pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
    pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1566847&r1=1566846&r2=1566847&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Mon Feb 10 23:28:05 2014
@@ -45,22 +45,38 @@ public class TezPlanContainer extends Op
         this.pigContext = pigContext;
     }
 
-    // Use pig.jar and udf jars for the AM resources (all DAG in the planContainer will
-    // use it for simplicity)
+    // Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
+    // will use them for simplicity). This differs from MR Pig, where they are added to
+    // the job jar.
     public Map<String, LocalResource> getLocalResources() throws Exception {
         Set<URL> jarLists = new HashSet<URL>();
 
         jarLists.add(TezResourceManager.getBootStrapJar());
 
+        // In MR Pig the extra jars and script jars get put in Distributed Cache, but
+        // in Tez we'll add them as local resources.
         for (java.net.URL jarUrl : pigContext.extraJars) {
             jarLists.add(ConverterUtils.getYarnUrlFromURI(jarUrl.toURI()));
         }
 
+        for (String jarFile : pigContext.scriptJars) {
+            jarLists.add(ConverterUtils.getYarnUrlFromURI(new File(jarFile).toURI()));
+        }
+
+        // Script files for non-Java UDF's are added to the Job.jar by the JarManager class,
+        // except for Groovy files, which need to be explicitly added as local resources due
+        // to the GroovyScriptEngine (see JarManager.java for comments).
+        for (Map.Entry<String, File> scriptFile : pigContext.getScriptFiles().entrySet()) {
+            if (scriptFile.getKey().endsWith(".groovy")) {
+                jarLists.add(ConverterUtils.getYarnUrlFromURI(scriptFile.getValue().toURI()));
+            }
+        }
+
         TezPlanContainerUDFCollector tezPlanContainerUDFCollector = new TezPlanContainerUDFCollector(this);
         tezPlanContainerUDFCollector.visit();
         Set<String> udfs = tezPlanContainerUDFCollector.getUdfs();
 
-        for (String func: udfs) {
+        for (String func : udfs) {
             Class clazz = pigContext.getClassForAlias(func);
             if (clazz != null) {
                 String jarName = JarManager.findContainingJar(clazz);
@@ -71,8 +87,17 @@ public class TezPlanContainer extends Op
                     // avoid NPE.
                     continue;
                 }
+
                 URL jarUrl = ConverterUtils.getYarnUrlFromURI(new File(jarName).toURI());
                 jarLists.add(jarUrl);
+
+                // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
+                // resources for them yet.
+                // if ("StreamingUDF".equals(clazz.getSimpleName())) {
+                //     for (String fileName : StreamingUDF.getResourcesForJar()) {
+                //         jarLists.add(ConverterUtils.getYarnUrlFromURI(new File(fileName).toURI()));
+                //     }
+                // }
             }
         }
 

Modified: pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java?rev=1566847&r1=1566846&r2=1566847&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java Mon Feb 10 23:28:05 2014
@@ -137,12 +137,32 @@ public class JarManager {
         for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
             addContainingJar(jarList, pkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
         }
-        
+
         Iterator<JarListEntry> it = jarList.iterator();
         while (it.hasNext()) {
             JarListEntry jarEntry = it.next();
             mergeJar(jarFile, jarEntry.jar, jarEntry.prefix, contents);
         }
+
+        // Just like in MR Pig, we'll add the script files to Job.jar. For Jython, MR Pig packages
+        // all the dependencies in Job.jar. For JRuby, we need the resource pigudf.rb, which is in
+        // the pig jar. JavaScript files could be packaged either in the Job.jar or as Tez local
+        // resources; MR Pig adds them to the Job.jar so that's what we will do also. Groovy files
+        // must be added as Tez local resources in the TezPlanContainer (in MR Pig Groovy UDF's
+        // are actually broken since they cannot be found by the GroovyScriptEngine).
+        for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
+            InputStream stream = null;
+            if (entry.getValue().exists()) {
+                stream = new FileInputStream(entry.getValue());
+            } else {
+                stream = PigContext.getClassLoader().getResourceAsStream(entry.getValue().getPath());
+            }
+            if (stream == null) {
+                throw new IOException("Cannot find " + entry.getValue().getPath());
+            }
+            addStream(jarFile, entry.getKey(), stream, contents);
+        }
+
         jarFile.close();
     }
 

Modified: pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java?rev=1566847&r1=1566846&r2=1566847&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/branches/tez/src/org/apache/pig/scripting/ScriptEngine.java Mon Feb 10 23:28:05 2014
@@ -47,14 +47,12 @@ public abstract class ScriptEngine {
 
     public static enum SupportedScriptLang {
 
-        // possibly jruby in the future
         jruby(new String[]{"ruby", "jruby"}, new String[]{"rb"}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
         jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"),
         javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine"),
         groovy(new String[]{}, new String[]{"groovy"}, "org.apache.pig.scripting.groovy.GroovyScriptEngine"),
         streaming_python(new String[]{"streaming_python"}, new String[]{}, "org.apache.pig.scripting.streaming.python.PythonScriptEngine");
 
-
         private static Set<String> supportedScriptLangs;
         static {
             supportedScriptLangs = new HashSet<String>();