You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/11/06 09:42:33 UTC

svn commit: r1198271 - in /pig/branches/branch-0.10: CHANGES.txt src/org/apache/pig/Main.java src/org/apache/pig/impl/PigContext.java src/org/apache/pig/scripting/Pig.java test/e2e/pig/tests/turing_jython.conf

Author: daijy
Date: Sun Nov  6 08:42:33 2011
New Revision: 1198271

URL: http://svn.apache.org/viewvc?rev=1198271&view=rev
Log:
PIG-2165: Need a way to deal with params and param_file in embedded pig in python

Modified:
    pig/branches/branch-0.10/CHANGES.txt
    pig/branches/branch-0.10/src/org/apache/pig/Main.java
    pig/branches/branch-0.10/src/org/apache/pig/impl/PigContext.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/Pig.java
    pig/branches/branch-0.10/test/e2e/pig/tests/turing_jython.conf

Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1198271&r1=1198270&r2=1198271&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Sun Nov  6 08:42:33 2011
@@ -148,6 +148,8 @@ PIG-2228: support partial aggregation in
 
 BUG FIXES
 
+PIG-2165: Need a way to deal with params and param_file in embedded pig in python (daijy)
+
 PIG-2313: NPE in ILLUSTRATE trying to get StatusReporter in STORE (daijy)
 
 PIG-2335: bin/pig does not work with bash 3.0 (azaroth)

Modified: pig/branches/branch-0.10/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/Main.java?rev=1198271&r1=1198270&r2=1198271&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/Main.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/Main.java Sun Nov  6 08:42:33 2011
@@ -135,8 +135,8 @@ static int run(String args[], PigProgres
         boolean debug = false;
         boolean dryrun = false;
         boolean embedded = false;
-        ArrayList<String> params = new ArrayList<String>();
-        ArrayList<String> paramFiles = new ArrayList<String>();
+        List<String> params = new ArrayList<String>();
+        List<String> paramFiles = new ArrayList<String>();
         HashSet<String> optimizerRules = new HashSet<String>();
 
         CmdLineParser opts = new CmdLineParser(pigArgs);
@@ -370,6 +370,11 @@ static int run(String args[], PigProgres
         Grunt grunt = null;
         BufferedReader in;
         String substFile = null;
+        
+        paramFiles = fetchRemoteParamFiles(paramFiles, properties);
+        pigContext.setParams(params);
+        pigContext.setParamFiles(paramFiles);
+        
         switch (mode) {
         
         case FILE: {
@@ -394,7 +399,8 @@ static int run(String args[], PigProgres
             
             // run parameter substitution preprocessor first
             substFile = file + ".substituted";
-                pin = runParamPreprocessor(properties, in, params, paramFiles,
+            
+                pin = runParamPreprocessor(pigContext, in,
                         substFile, debug || dryrun || checkScriptOnly);
             if (dryrun) {
                 if (dryrun(substFile, pigContext)) {
@@ -522,7 +528,7 @@ static int run(String args[], PigProgres
             
             // run parameter substitution preprocessor first
             substFile = remainders[0] + ".substituted";
-            pin = runParamPreprocessor(properties, in, params, paramFiles, substFile, debug || dryrun || checkScriptOnly);
+            pin = runParamPreprocessor(pigContext, in, substFile, debug || dryrun || checkScriptOnly);
             if (dryrun) {
                 if (dryrun(substFile, pigContext)) {
                     log.info("Dry run completed. Substituted pig script is at "
@@ -691,17 +697,21 @@ private static void configureLog4J(Prope
     pigContext.setLog4jProperties(backendProps);
     pigContext.setDefaultLogLevel(logLevel);
 }
- 
-// returns the stream of final pig script to be passed to Grunt
-private static BufferedReader runParamPreprocessor(Properties properties, BufferedReader origPigScript, ArrayList<String> params,
-                                            ArrayList<String> paramFiles, String scriptFile, boolean createFile) 
-                                throws org.apache.pig.tools.parameters.ParseException, IOException{
-    
-    ArrayList<String> paramFiles2 = new ArrayList<String>();
+
+
+private static List<String> fetchRemoteParamFiles(List<String> paramFiles, Properties properties)
+        throws IOException {
+    List<String> paramFiles2 = new ArrayList<String>();
     for (String param: paramFiles) {
         FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(properties, param);
         paramFiles2.add(localFileRet.file.getAbsolutePath());
-    }    
+    }
+    return paramFiles2;
+}
+// returns the stream of final pig script to be passed to Grunt
+private static BufferedReader runParamPreprocessor(PigContext context, BufferedReader origPigScript, 
+                                            String scriptFile, boolean createFile) 
+                                throws org.apache.pig.tools.parameters.ParseException, IOException{
     
     ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
     String[] type1 = new String[1];
@@ -709,14 +719,14 @@ private static BufferedReader runParamPr
 
     if (createFile){
         BufferedWriter fw = new BufferedWriter(new FileWriter(scriptFile));
-        psp.genSubstitutedFile (origPigScript, fw, params.size() > 0 ? params.toArray(type1) : null, 
-                                paramFiles.size() > 0 ? paramFiles2.toArray(type2) : null);
+        psp.genSubstitutedFile (origPigScript, fw, context.getParams().size() > 0 ? context.getParams().toArray(type1) : null, 
+                                context.getParamFiles().size() > 0 ? context.getParamFiles().toArray(type2) : null);
         return new BufferedReader(new FileReader (scriptFile));
 
     } else {
         StringWriter writer = new StringWriter();
-        psp.genSubstitutedFile (origPigScript, writer,  params.size() > 0 ? params.toArray(type1) : null, 
-                                paramFiles.size() > 0 ? paramFiles2.toArray(type2) : null);
+        psp.genSubstitutedFile (origPigScript, writer,  context.getParams().size() > 0 ? context.getParams().toArray(type1) : null, 
+                                context.getParamFiles().size() > 0 ? context.getParamFiles().toArray(type2) : null);
         return new BufferedReader(new StringReader(writer.toString()));
     }
 }

Modified: pig/branches/branch-0.10/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/impl/PigContext.java?rev=1198271&r1=1198270&r2=1198271&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/impl/PigContext.java Sun Nov  6 08:42:33 2011
@@ -134,6 +134,24 @@ public class PigContext implements Seria
 
     static private ClassLoader classloader = PigContext.class.getClassLoader();
     
+    private List<String> params;
+    public List<String> getParams() {
+        return params;
+    }
+
+    public void setParams(List<String> params) {
+        this.params = params;
+    }
+
+    public List<String> getParamFiles() {
+        return paramFiles;
+    }
+
+    public void setParamFiles(List<String> paramFiles) {
+        this.paramFiles = paramFiles;
+    }
+    private List<String> paramFiles;
+    
     public PigContext() {
         this(ExecType.MAPREDUCE, new Properties());
     }

Modified: pig/branches/branch-0.10/src/org/apache/pig/scripting/Pig.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/Pig.java?rev=1198271&r1=1198270&r2=1198271&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/Pig.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/Pig.java Sun Nov  6 08:42:33 2011
@@ -280,6 +280,11 @@ public class Pig {
             plist.add(entry.getKey() + "="
                     + fixNonEscapedDollarSign(entry.getValue().toString()));
         }
+        if (getScriptContext().getPigContext().getParams()!=null) {
+            for (String param : getScriptContext().getPigContext().getParams()) {
+                plist.add(param);
+            }
+        }
         
         ParameterSubstitutionPreprocessor psp = 
             new ParameterSubstitutionPreprocessor(50);
@@ -288,8 +293,12 @@ public class Pig {
         
         StringWriter writer = new StringWriter();
         BufferedReader in = new BufferedReader(new StringReader(qstr));
+        String[] type1 = new String[1];
         try {
-            psp.genSubstitutedFile(in, writer, plist.toArray(params), null);
+            psp.genSubstitutedFile(in, writer, plist.toArray(params), 
+                    scriptContext.getPigContext().getParamFiles()!=null && 
+                    scriptContext.getPigContext().getParamFiles().size() > 0 ? 
+                    scriptContext.getPigContext().getParamFiles().toArray(type1) : null);
         } catch (org.apache.pig.tools.parameters.ParseException e) {
             throw new IOException("Param substitution failed", e);            
         } 

Modified: pig/branches/branch-0.10/test/e2e/pig/tests/turing_jython.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/e2e/pig/tests/turing_jython.conf?rev=1198271&r1=1198270&r2=1198271&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/e2e/pig/tests/turing_jython.conf (original)
+++ pig/branches/branch-0.10/test/e2e/pig/tests/turing_jython.conf Sun Nov  6 08:42:33 2011
@@ -400,9 +400,57 @@ else:
 \
                         ,'floatpostprocess' => 1
                         ,'delimiter' => '	'
-		}
-      ] 
 	},{
+                'num' => 11
+                ,'pig_params' => ['-p', qq(loadfile='studenttab10k')],
+                ,'pig' => q\#!/usr/bin/python
+from org.apache.pig.scripting import Pig
+
+P = Pig.compile("""A = load ':INPATH:/singlefile/$loadfile' as (name, age, gpa);
+store A into ':OUTPATH:';""")
+
+Q = P.bind()
+
+result = Q.runSingle()
+
+if result.isSuccessful():
+    print "Pig job PASSED"
+else:
+    raise "Pig job FAILED"
+\,
+
+             'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+                                      store A into ':OUTPATH:';
+\
+                        ,'floatpostprocess' => 1
+                        ,'delimiter' => '       '
+        },{
+                'num' => 12
+                ,'pig_params' => ['-m', ":PARAMPATH:/params_3"],
+                ,'pig' => q\#!/usr/bin/python
+from org.apache.pig.scripting import Pig
+
+P = Pig.compile("""A = load ':INPATH:/singlefile/$fname' as (name, age, gpa);
+store A into ':OUTPATH:';""")
+
+Q = P.bind()
+
+result = Q.runSingle()
+
+if result.isSuccessful():
+    print "Pig job PASSED"
+else:
+    raise "Pig job FAILED"
+\,
+
+             'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+                                      store A into ':OUTPATH:';
+\
+                        ,'floatpostprocess' => 1
+                        ,'delimiter' => '       '
+                }
+      ]
+        },{
 	'name' => 'Jython_Diagnostics'
        ,'tests' => [
                 {