You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/08/06 07:37:59 UTC

svn commit: r1510858 [1/2] - in /pig/trunk: ./ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/ src/org/apache/pig/impl/ src/org/apache/pig/parser/ src/org/apache/pig/scripting/ src/org/apache/pig/tools/ src/org/apache/pig/tools/grunt/ src...

Author: cheolsoo
Date: Tue Aug  6 05:37:59 2013
New Revision: 1510858

URL: http://svn.apache.org/r1510858
Log:
PIG-3359: Register Statements and Param Substitution in Macros (jpacker via cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/parser/AstPrinter.g
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/PigMacro.java
    pig/trunk/src/org/apache/pig/parser/QueryLexer.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java
    pig/trunk/src/org/apache/pig/scripting/Pig.java
    pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
    pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    pig/trunk/src/org/apache/pig/tools/parameters/ParameterSubstitutionPreprocessor.java
    pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java
    pig/trunk/test/org/apache/pig/pigunit/PigTest.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java
    pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug  6 05:37:59 2013
@@ -28,6 +28,8 @@ PIG-3174: Remove rpm and deb artifacts f
 
 IMPROVEMENTS
 
+PIG-3359: Register Statements and Param Substitution in Macros (jpacker via cheolsoo)
+
 PIG-3182: Pig currently lacks functions to trim the whitespace only on one hand side (sarutak via cheolsoo)
 
 PIG-3163: Pig current releases lack a UDF endsWith. This UDF tests if a given string ends with the specified suffix (sriramkrishnan via cheolsoo)

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml Tue Aug  6 05:37:59 2013
@@ -1001,9 +1001,8 @@ public class idmapreduce{
    <p>Note the following restrictions:</p>
    <ul>
    <li>Macros are not allowed inside a <a href="basic.html#nested-block">FOREACH</a> nested block.</li>
-   <li>Macros can only contain <a href="start.html#pl-statements">Pig Latin statements</a>. The <a href="basic.html#register">REGISTER</a>  statement is not supported. The <a href="cmds.html#shell-cmds">shell commands</a> (used with Grunt) are not supported.</li>
+   <li>Macros cannot contain <a href="cmds.html#shell-cmds">Grunt shell commands</a>.</li>
    <li>Macros cannot include a user-defined schema that has a name collision with an alias in the macro.</li>
-   <li><a href="#Parameter-Sub">Parameter substitution</a> cannot be used inside of macros. Parameters should be explicitly passed to macros and parameter substitution used only at the top level.</li>
    </ul>
    <p></p>
 
@@ -1044,7 +1043,7 @@ STORE Y INTO 'output';
 
 <!-- +++++++++++++++++++++++++++++++++++++++++++++++ -->
 <p><strong>Macro Import</strong></p>
-<p>A macro can be imported from another Pig script (see <a href="#import-macros">IMPORT (macros)</a>).</p>
+<p>A macro can be imported from another Pig script (see <a href="#import-macros">IMPORT (macros)</a>). Splitting your macros from your main Pig script is useful for making reusable code.</p>
 </section> 
 
 
@@ -1188,7 +1187,7 @@ STORE Y INTO 'byuser';
    <title>Usage</title>
    <p>Use the IMPORT command to import a macro defined in a separate file into your Pig script. </p>
    <p>IMPORT adds the macro definitions to the Pig Latin namespace; these macros can then be invoked as if they were defined in the same file.</p>
-   <p>Macros can only contain Pig Latin statements; Grunt shell commands are not supported.</p>
+   <p>Macros can only contain Pig Latin statements; Grunt shell commands are not supported. REGISTER statements and parameter definitions with %default or %declare are both valid however. Your macro file also IMPORT other macro files, so long as these imports are not recursive.</p>
    
    <p>See also: <a href="#define-macros">DEFINE (macros)</a></p>
      </section> 
@@ -1395,7 +1394,7 @@ IMPORT 'my_macro.pig';
       </li>
    </ul>
    <p></p>
-   <p>Parameter substitution CANNOT be used inside of macros.  Parameters should be explicitly passed to macros and parameter substitution used only at the top level (see <a href="#define-macros">DEFINE (macros)</a>).</p>
+   <p>Parameter substitution may be used inside of macros, but it is the responsibility of the user to ensure that there are no conflicts between names of parameters defined at the top level and names of arguments or return values for a macro. A simple way to ensure this is to use ALL_CAPS for top-level parameters and lower_case for macro-level parameters. See <a href="#define-macros">DEFINE (macros)</a>.</p>
    </section>
    
    <section>

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Tue Aug  6 05:37:59 2013
@@ -29,7 +29,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.StringReader;
-import java.io.StringWriter;
 import java.text.ParseException;
 import java.util.AbstractList;
 import java.util.ArrayList;
@@ -74,7 +73,6 @@ import org.apache.pig.scripting.ScriptEn
 import org.apache.pig.scripting.ScriptEngine.SupportedScriptLang;
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
-import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
@@ -634,13 +632,13 @@ static int run(String args[], PigProgres
         }
 
         if(!gruntCalled) {
-        	LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
+            LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
         }
     } catch (Throwable e) {
         rc = ReturnCode.THROWABLE_EXCEPTION;
         PigStatsUtil.setErrorMessage(e.getMessage());
         if(!gruntCalled) {
-        	LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
+            LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
         }
     } finally {
         // clear temp files
@@ -745,7 +743,7 @@ private static void configureLog4J(Prope
     PropertyConfigurator.configure(props);
     logLevel = Logger.getLogger("org.apache.pig").getLevel();
     if (logLevel==null) {
-    	logLevel = Logger.getLogger("org.apache.pig").getEffectiveLevel();
+        logLevel = Logger.getLogger("org.apache.pig").getEffectiveLevel();
     }
     Properties backendProps = pigContext.getLog4jProperties();
     backendProps.setProperty("log4j.logger.org.apache.pig", logLevel.toString());
@@ -768,21 +766,11 @@ private static BufferedReader runParamPr
                                             String scriptFile, boolean createFile)
                                 throws org.apache.pig.tools.parameters.ParseException, IOException{
 
-    ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
-    String[] type1 = new String[1];
-    String[] type2 = new String[1];
-
-    if (createFile){
-        BufferedWriter fw = new BufferedWriter(new FileWriter(scriptFile));
-        psp.genSubstitutedFile (origPigScript, fw, context.getParams().size() > 0 ? context.getParams().toArray(type1) : null,
-                                context.getParamFiles().size() > 0 ? context.getParamFiles().toArray(type2) : null);
-        return new BufferedReader(new FileReader (scriptFile));
-
+    if (createFile) {
+        return context.doParamSubstitutionOutputToFile(origPigScript, scriptFile);
     } else {
-        StringWriter writer = new StringWriter();
-        psp.genSubstitutedFile (origPigScript, writer,  context.getParams().size() > 0 ? context.getParams().toArray(type1) : null,
-                                context.getParamFiles().size() > 0 ? context.getParamFiles().toArray(type2) : null);
-        return new BufferedReader(new StringReader(writer.toString()));
+        String substituted = context.doParamSubstitution(origPigScript);
+        return new BufferedReader(new StringReader(substituted));
     }
 }
 
@@ -837,7 +825,7 @@ public static String getVersionString() 
  */
 public static void usage()
 {
-	System.out.println("\n"+getVersionString()+"\n");
+        System.out.println("\n"+getVersionString()+"\n");
         System.out.println("USAGE: Pig [options] [-] : Run interactively in grunt shell.");
         System.out.println("       Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).");
         System.out.println("       Pig [options] [-f[ile]] file : Run cmds found in file.");
@@ -850,7 +838,7 @@ public static void usage()
         System.out.println("    -f, -file - Path to the script to execute");
         System.out.println("    -g, -embedded - ScriptEngine classname or keyword for the ScriptEngine");
         System.out.println("    -h, -help - Display this message. You can specify topic to get help for that topic.");
-	System.out.println("        properties is the only topic currently supported: -h properties.");
+        System.out.println("        properties is the only topic currently supported: -h properties.");
         System.out.println("    -i, -version - Display version information");
         System.out.println("    -l, -logfile - Path to client side log file; default is current working directory.");
         System.out.println("    -m, -param_file - Path to the parameter file");
@@ -879,8 +867,8 @@ public static void usage()
 }
 
 public static void printProperties(){
-	System.out.println("The following properties are supported:");
-	System.out.println("    Logging:");
+        System.out.println("The following properties are supported:");
+        System.out.println("    Logging:");
         System.out.println("        verbose=true|false; default is false. This property is the same as -v switch");
         System.out.println("        brief=true|false; default is false. This property is the same as -b switch");
         System.out.println("        debug=OFF|ERROR|WARN|INFO|DEBUG; default is INFO. This property is the same as -d switch");
@@ -888,7 +876,7 @@ public static void printProperties(){
         System.out.println("            of each type rather than logging each warning.");
         System.out.println("    Performance tuning:");
         System.out.println("        pig.cachedbag.memusage=<mem fraction>; default is 0.2 (20% of all memory).");
-	System.out.println("            Note that this memory is shared across all large bags used by the application.");
+        System.out.println("            Note that this memory is shared across all large bags used by the application.");
         System.out.println("        pig.skewedjoin.reduce.memusagea=<mem fraction>; default is 0.3 (30% of all memory).");
         System.out.println("            Specifies the fraction of heap available for the reducer to perform the join.");
         System.out.println("        pig.exec.nocombiner=true|false; default is false. ");
@@ -916,7 +904,7 @@ public static void printProperties(){
         System.out.println("        stop.on.failure=true|false; default is false. Set to true to terminate on the first error.");
         System.out.println("        pig.datetime.default.tz=<UTC time offset>. e.g. +08:00. Default is the default timezone of the host.");
         System.out.println("            Determines the timezone used to handle datetime datatype and UDFs. ");
-	System.out.println("Additionally, any Hadoop property can be specified.");
+        System.out.println("Additionally, any Hadoop property can be specified.");
 }
 
 private static String validateLogFile(String logFileName, String scriptName) {
@@ -1021,25 +1009,25 @@ private static SupportedScriptLang deter
 
 private static int runEmbeddedScript(PigContext pigContext, String file, String engine)
 throws IOException {
-	log.info("Run embedded script: " + engine);
-	pigContext.connect();
-	ScriptEngine scriptEngine = ScriptEngine.getInstance(engine);
-	Map<String, List<PigStats>> statsMap = scriptEngine.run(pigContext, file);
-	PigStatsUtil.setStatsMap(statsMap);
-
-	int failCount = 0;
-	int totalCount = 0;
-	for (List<PigStats> lst : statsMap.values()) {
-		if (lst != null && !lst.isEmpty()) {
-			for (PigStats stats : lst) {
-				if (!stats.isSuccessful()) failCount++;
-				totalCount++;
-			}
-		}
-	}
-	return (totalCount > 0 && failCount == totalCount) ? ReturnCode.FAILURE
-			: (failCount > 0) ? ReturnCode.PARTIAL_FAILURE
-					: ReturnCode.SUCCESS;
+    log.info("Run embedded script: " + engine);
+    pigContext.connect();
+    ScriptEngine scriptEngine = ScriptEngine.getInstance(engine);
+    Map<String, List<PigStats>> statsMap = scriptEngine.run(pigContext, file);
+    PigStatsUtil.setStatsMap(statsMap);
+
+    int failCount = 0;
+    int totalCount = 0;
+    for (List<PigStats> lst : statsMap.values()) {
+        if (lst != null && !lst.isEmpty()) {
+            for (PigStats stats : lst) {
+                if (!stats.isSuccessful()) failCount++;
+                totalCount++;
+            }
+        }
+    }
+    return (totalCount > 0 && failCount == totalCount) ? ReturnCode.FAILURE
+            : (failCount > 0) ? ReturnCode.PARTIAL_FAILURE
+                    : ReturnCode.SUCCESS;
 }
 
 }

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Tue Aug  6 05:37:59 2013
@@ -23,10 +23,8 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.io.StringReader;
-import java.io.StringWriter;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -85,7 +83,6 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.ScalarExpression;
 import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
 import org.apache.pig.newplan.logical.optimizer.DanglingNestedNodeRemover;
-import org.apache.pig.newplan.logical.optimizer.UidResetter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOStore;
@@ -104,7 +101,6 @@ import org.apache.pig.parser.QueryParser
 import org.apache.pig.pen.ExampleGenerator;
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.tools.grunt.GruntParser;
-import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -491,6 +487,11 @@ public class PigServer {
      * @throws IOException
      */
     public void registerJar(String name) throws IOException {
+        if (pigContext.hasJar(name)) {
+            log.debug("Ignoring duplicate registration for jar " + name);
+            return;
+        }
+
         // first try to locate jar via system resources
         // if this fails, try by using "name" as File (this preserves
         // compatibility with case when user passes absolute path or path
@@ -505,19 +506,18 @@ public class PigServer {
 
             if (resource == null) {
                 FetchFileRet[] files = FileLocalizer.fetchFiles(pigContext.getProperties(), name);
+                for (FetchFileRet file : files) {
+                    File f = file.file;
+                    if (!f.canRead()) {
+                        int errCode = 4002;
+                        String msg = "Can't read jar file: " + name;
+                        throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
+                    }
 
-                for(FetchFileRet file : files) {
-                  File f = file.file;
-                  if (!f.canRead()) {
-                    int errCode = 4002;
-                    String msg = "Can't read jar file: " + name;
-                    throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
-                  }
-
-                  pigContext.addJar(f.toURI().toURL());
+                    pigContext.addJar(f.toURI().toURL(), name);
                 }
             } else {
-              pigContext.addJar(resource);
+                pigContext.addJar(resource, name);
             }
         }
     }
@@ -531,7 +531,15 @@ public class PigServer {
      * @throws IOException
      */
     public void registerCode(String path, String scriptingLang, String namespace)
-    throws IOException {
+                             throws IOException {
+        if (pigContext.scriptingUDFs.containsKey(path) &&
+            pigContext.scriptingUDFs.get(path).equals(namespace)) {
+            log.debug("Ignoring duplicate registration for scripting udf file " + path + " in namespace " + namespace);
+            return;
+        } else {
+            pigContext.scriptingUDFs.put(path, namespace);
+        }
+
         File f = FileLocalizer.fetchFile(pigContext.getProperties(), path).file;
         if (!f.canRead()) {
             int errCode = 4002;
@@ -632,7 +640,7 @@ public class PigServer {
      */
     public void registerScript(InputStream in, Map<String,String> params,List<String> paramsFiles) throws IOException {
         try {
-            String substituted = doParamSubstitution(in, params, paramsFiles);
+            String substituted = pigContext.doParamSubstitution(in, paramMapToList(params), paramsFiles);
             GruntParser grunt = new GruntParser(new StringReader(substituted));
             grunt.setInteractive(false);
             grunt.setParams(this);
@@ -643,39 +651,14 @@ public class PigServer {
         }
     }
 
-    /**
-     * Do parameter substitution.
-     * @param in The InputStream of file containing Pig Latin to do substitution on.
-     * @param params Parameters to use to substitute
-     * @param paramsFiles Files to use to do substitution.
-     * @return String containing Pig Latin with substitutions done
-     * @throws IOException
-     */
-    protected String doParamSubstitution(InputStream in,
-                                         Map<String,String> params,
-                                         List<String> paramsFiles) throws IOException {
-        // transform the map type to list type which can been accepted by ParameterSubstitutionPreprocessor
+    protected List<String> paramMapToList(Map<String, String> params) {
         List<String> paramList = new ArrayList<String>();
         if (params != null) {
             for (Map.Entry<String, String> entry : params.entrySet()) {
                 paramList.add(entry.getKey() + "=" + entry.getValue());
-             }
-        }
-
-        // do parameter substitution
-        try {
-            ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
-            StringWriter writer = new StringWriter();
-            psp.genSubstitutedFile(new BufferedReader(new InputStreamReader(in)),
-                                   writer,
-                                   paramList.size() > 0 ? paramList.toArray(new String[0]) : null,
-                                   paramsFiles!=null ? paramsFiles.toArray(new String[0]) : null);
-
-            return writer.toString();
-        } catch (org.apache.pig.tools.parameters.ParseException e) {
-            log.error(e.getLocalizedMessage());
-            throw new IOException(e.getCause());
+            }
         }
+        return paramList;
     }
 
     /**
@@ -1203,9 +1186,9 @@ public class PigServer {
      */
     public void shutdown() {
         // clean-up activities
-            // TODO: reclaim scope to free up resources. Currently
+        // TODO: reclaim scope to free up resources. Currently
         // this is not implemented and throws an exception
-            // hence, for now, we won't call it.
+        // hence, for now, we won't call it.
         //
         // pigContext.getExecutionEngine().reclaimScope(this.scope);
 

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Aug  6 05:37:59 2013
@@ -17,9 +17,17 @@
  */
 package org.apache.pig.impl;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.StringWriter;
 import java.lang.reflect.Constructor;
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -36,6 +44,7 @@ import java.util.Properties;
 import java.util.StringTokenizer;
 import java.util.Vector;
 
+import org.antlr.runtime.tree.Tree;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -56,6 +65,9 @@ import org.apache.pig.backend.hadoop.str
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
+import org.apache.pig.tools.parameters.ParseException;
+import org.apache.pig.tools.parameters.PreprocessorContext;
 
 public class PigContext implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -77,12 +89,6 @@ public class PigContext implements Seria
     //one of: local, mapreduce, pigbody
     private ExecType execType;;
 
-    //  extra jar files that are needed to run a job
-    transient public List<URL> extraJars = new LinkedList<URL>();
-
-    //  The jars that should not be merged in. (Some functions may come from pig.jar and we don't want the whole jar file.)
-    transient public Vector<String> skipJars = new Vector<String>(2);
-
     //main file system that jobs and shell commands access
     transient private DataStorage dfs;
 
@@ -94,13 +100,36 @@ public class PigContext implements Seria
 
     private Properties properties;
 
-    //  script files that are needed to run a job
+    /*
+     * Resources for the job (jars, scripting udf files, cached macro abstract syntax trees)
+     */
+
+    // extra jar files that are needed to run a job
+    transient public List<URL> extraJars = new LinkedList<URL>();
+
+    // original paths each extra jar came from
+    // used to avoid redundant imports
+    transient private Map<URL, String> extraJarOriginalPaths = new HashMap<URL, String>();
+
+    // jars needed for scripting udfs - jython.jar etc
+    public List<String> scriptJars = new ArrayList<String>(2);
+
+    // jars that should not be merged in.
+    // (some functions may come from pig.jar and we don't want the whole jar file.)
+    transient public Vector<String> skipJars = new Vector<String>(2);
+
+    // script files that are needed to run a job
     @Deprecated
     public List<String> scriptFiles = new ArrayList<String>();
     private Map<String,File> aliasedScriptFiles = new LinkedHashMap<String,File>();
 
-    //  script jars that are needed to run a script - jython.jar etc
-    public List<String> scriptJars = new ArrayList<String>(2);
+    // record of scripting udf file path --> which namespace it was registered to
+    // used to avoid redundant imports
+    transient public Map<String, String> scriptingUDFs;
+
+    // cache of macro file path --> abstract syntax tree
+    // used to avoid re-parsing the same macros over and over
+    transient public Map<String, Tree> macros;
 
     /**
      * a table mapping function names to function specs.
@@ -157,12 +186,20 @@ public class PigContext implements Seria
 
     static private ContextClassLoader classloader = new ContextClassLoader(PigContext.class.getClassLoader());
 
+    /*
+     * Parameter-related fields
+     * params: list of strings "key=value" from the command line
+     * paramFiles: list of paths to parameter files
+     * preprocessorContext: manages parsing params and paramFiles into an actual map
+     */
 
     private List<String> params;
+    private List<String> paramFiles;
+    transient private PreprocessorContext preprocessorContext = new PreprocessorContext(50);
+
     public List<String> getParams() {
         return params;
     }
-
     public void setParams(List<String> params) {
         this.params = params;
     }
@@ -170,11 +207,27 @@ public class PigContext implements Seria
     public List<String> getParamFiles() {
         return paramFiles;
     }
-
     public void setParamFiles(List<String> paramFiles) {
         this.paramFiles = paramFiles;
     }
-    private List<String> paramFiles;
+
+    public PreprocessorContext getPreprocessorContext() {
+        return preprocessorContext;
+    }
+
+    public Map<String, String> getParamVal() throws IOException {
+        Map<String, String> paramVal = preprocessorContext.getParamVal();
+        if (paramVal == null) {
+            try {
+                preprocessorContext.loadParamVal(params, paramFiles);
+            } catch (ParseException e) {
+                throw new IOException(e.getMessage());
+            }
+            return preprocessorContext.getParamVal();
+        } else {
+            return paramVal;
+        }
+    }
 
     public PigContext() {
         this(ExecType.MAPREDUCE, new Properties());
@@ -207,6 +260,9 @@ public class PigContext implements Seria
         skippedShipPaths.add("/usr/sbin");
         skippedShipPaths.add("/usr/local/sbin");
         
+        macros = new HashMap<String, Tree>();
+        scriptingUDFs = new HashMap<String, String>();
+
         init();
     }
 
@@ -284,6 +340,15 @@ public class PigContext implements Seria
         }
     }
 
+    public boolean hasJar(String path) {
+        for (URL url : extraJars) {
+            if (extraJarOriginalPaths.get(url).equals(path)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * this method adds script files that must be added to the shipped jar
      * named differently from their local fs path.
@@ -299,18 +364,78 @@ public class PigContext implements Seria
     public void addJar(String path) throws MalformedURLException {
         if (path != null) {
             URL resource = (new File(path)).toURI().toURL();
-            addJar(resource);
+            addJar(resource, path);
         }
     }
 
-    public void addJar(URL resource) throws MalformedURLException{
+    public void addJar(URL resource, String originalPath) throws MalformedURLException{
         if (resource != null) {
             extraJars.add(resource);
+            extraJarOriginalPaths.put(resource, originalPath);
             classloader.addURL(resource);
             Thread.currentThread().setContextClassLoader(PigContext.classloader);
         }
     }
 
+    public String doParamSubstitution(InputStream in,
+                                      List<String> params,
+                                      List<String> paramFiles)
+                                      throws IOException {
+
+        return doParamSubstitution(new BufferedReader(new InputStreamReader(in)),
+                                   params, paramFiles);
+    }
+
+    public String doParamSubstitution(BufferedReader reader,
+                                      List<String> params,
+                                      List<String> paramFiles)
+                                      throws IOException {
+        this.params = params;
+        this.paramFiles = paramFiles;
+        return doParamSubstitution(reader);
+    }
+
+    public String doParamSubstitution(BufferedReader reader) throws IOException {
+        try {
+            preprocessorContext.loadParamVal(params, paramFiles);
+            ParameterSubstitutionPreprocessor psp
+                = new ParameterSubstitutionPreprocessor(preprocessorContext);
+            StringWriter writer = new StringWriter();
+            psp.genSubstitutedFile(reader, writer);
+            return writer.toString();
+        } catch (ParseException e) {
+            log.error(e.getLocalizedMessage());
+            throw new IOException(e.getCause());
+        }
+    }
+
+    public BufferedReader doParamSubstitutionOutputToFile(BufferedReader reader,
+                                                  String outputFilePath,
+                                                  List<String> params,
+                                                  List<String> paramFiles)
+                                                  throws IOException {
+        this.params = params;
+        this.paramFiles = paramFiles;
+        return doParamSubstitutionOutputToFile(reader, outputFilePath);
+    }
+
+    public BufferedReader doParamSubstitutionOutputToFile(BufferedReader reader, String outputFilePath)
+                          throws IOException {
+        try {
+            preprocessorContext.loadParamVal(params, paramFiles);
+            ParameterSubstitutionPreprocessor psp
+                    = new ParameterSubstitutionPreprocessor(preprocessorContext);
+            BufferedWriter writer = new BufferedWriter(new FileWriter(outputFilePath));
+            psp.genSubstitutedFile(reader, writer);
+            return new BufferedReader(new FileReader(outputFilePath));
+        } catch (ParseException e) {
+            log.error(e.getLocalizedMessage());
+            throw new IOException(e.getCause());
+        } catch (FileNotFoundException e) {
+            throw new IOException("Could not find file to substitute parameters for: " + outputFilePath);
+        }
+    }
+
     /**
      * script files as name/file pairs to be added to the job jar
      * @return name/file pairs

Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Tue Aug  6 05:37:59 2013
@@ -55,6 +55,8 @@ query : ^( QUERY statement* )
 
 statement : general_statement
           | split_statement { sb.append(";\n"); }
+          | import_statement { sb.append(";\n"); }
+          | register_statement { sb.append(";\n"); }
           | realias_statement
 ;
 
@@ -64,6 +66,29 @@ split_statement : split_clause
 realias_statement : realias_clause
 ;
 
+import_statement : ^( IMPORT QUOTEDSTRING ) {
+                       sb.append(" ").append($IMPORT.text).append(" ").append($QUOTEDSTRING.text);
+                   }
+;
+
+register_statement : ^( REGISTER QUOTEDSTRING {
+                            sb.append($REGISTER.text).append(" ").append($QUOTEDSTRING.text);
+                        } scripting_udf_clause? )
+;
+
+scripting_udf_clause : scripting_language_clause scripting_namespace_clause
+;
+
+scripting_language_clause : (USING IDENTIFIER) {
+                                sb.append(" ").append($USING.text).append(" ").append($IDENTIFIER.text);
+                            }
+;
+
+scripting_namespace_clause : (AS IDENTIFIER) {
+                                 sb.append(" ").append($AS.text).append(" ").append($IDENTIFIER.text);
+                             }
+;
+
 // For foreach statement that with complex inner plan.
 general_statement
     : ^( STATEMENT ( alias { sb.append(" = "); } )?
@@ -204,9 +229,15 @@ func_name
     : eid ( ( PERIOD { sb.append($PERIOD.text); } | DOLLAR { sb.append($DOLLAR.text); } ) eid )*
 ;
 
-func_args
-    : a=QUOTEDSTRING { sb.append($a.text); }
-        (b=QUOTEDSTRING { sb.append(", ").append($b.text); } )*
+func_args : func_first_arg_clause (func_next_arg_clause)*
+;
+
+func_first_arg_clause :   QUOTEDSTRING { sb.append($QUOTEDSTRING.text); }
+                        | MULTILINE_QUOTEDSTRING { sb.append($MULTILINE_QUOTEDSTRING.text); }
+;
+
+func_next_arg_clause :    QUOTEDSTRING { sb.append(", ").append($QUOTEDSTRING.text); }
+                        | MULTILINE_QUOTEDSTRING { sb.append(", ").append($MULTILINE_QUOTEDSTRING.text); }
 ;
 
 cube_clause

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Tue Aug  6 05:37:59 2013
@@ -116,6 +116,7 @@ query : ^( QUERY statement* )
 statement : general_statement
           | split_statement
           | realias_statement
+          | register_statement
 ;
 
 split_statement : split_clause
@@ -124,6 +125,9 @@ split_statement : split_clause
 realias_statement : realias_clause
 ;
 
+register_statement : ^( REGISTER QUOTEDSTRING (USING IDENTIFIER AS IDENTIFIER)? )
+;
+
 general_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? op_clause parallel_clause? )
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Tue Aug  6 05:37:59 2013
@@ -173,6 +173,7 @@ scope {
  : general_statement
  | split_statement
  | realias_statement
+ | register_statement
 ;
 
 split_statement : split_clause
@@ -181,6 +182,14 @@ split_statement : split_clause
 realias_statement : realias_clause
 ;
 
+register_statement
+: ^( REGISTER QUOTEDSTRING (USING IDENTIFIER AS IDENTIFIER)? )
+  {
+    // registers are handled by QueryParserDriver and are not actually part of the logical plan
+    // so we just ignore them here
+  }
+;
+
 general_statement
 : ^( STATEMENT ( alias { $statement::alias = $alias.name; } )? oa = op_clause parallel_clause? )
   {

Modified: pig/trunk/src/org/apache/pig/parser/PigMacro.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/PigMacro.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/PigMacro.java (original)
+++ pig/trunk/src/org/apache/pig/parser/PigMacro.java Tue Aug  6 05:37:59 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.StreamTokenizer;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -36,8 +37,10 @@ import org.antlr.runtime.tree.CommonTree
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.parser.PigParserNode.InvocationPoint;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
+import org.apache.pig.tools.parameters.PreprocessorContext;
 
 class PigMacro {
 
@@ -50,6 +53,7 @@ class PigMacro {
     private List<String> rets;
     private Map<String, PigMacro> seen;
     private Set<String> macroStack;
+    private PigContext pigContext;
     private long idx = 0;
     
     // The start line number of this macro in the script
@@ -82,6 +86,10 @@ class PigMacro {
     int getStartLine() {
         return startLine;
     }
+
+    void setPigContext(PigContext pigContext) {
+        this.pigContext = pigContext;
+    }
     
     private CommonTree inline(String[] inputs, String[] outputs, CommonTree t,
             String file) throws ParserException {
@@ -150,19 +158,35 @@ class PigMacro {
                 args[params.size() + i] = rets.get(i) + "=" + outputs[i];
             }
         }
+
         StringWriter writer = new StringWriter();
         BufferedReader in = new BufferedReader(new StringReader(body));
+
         try {
-            ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(
-                    50);
-            psp.genSubstitutedFile(in, writer, args, null);
+            PreprocessorContext pc = new PreprocessorContext(50);
+            pc.loadParamVal(Arrays.asList(args), null);
+
+            Map<String, String> paramVal = pc.getParamVal();
+            for (Map.Entry<String, String> e : pigContext.getParamVal().entrySet()) {
+                if (paramVal.containsKey(e.getKey())) {
+                    throw new ParserException(
+                        "Macro contains argument or return value " + e.getKey() + " which conflicts " +
+                        "with a Pig parameter of the same name."
+                    );
+                } else {
+                    paramVal.put(e.getKey(), e.getValue());
+                }
+            }
+            
+            ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(pc);
+            psp.genSubstitutedFile(in, writer);
         } catch (Exception e) {
             // catch both ParserException and RuntimeException
             String msg = getErrorMessage(file, line,
                     "Macro inline failed for macro '" + name + "'",
                     e.getMessage() + "\n Macro content: " + body);
             throw new ParserException(msg);
-        } 
+        }
         
         LOG.debug("--- after substition:\n" + writer.toString());
         
@@ -234,7 +258,7 @@ class PigMacro {
 
         for (CommonTree t : inlineNodes) {
             CommonTree newTree = macroInline(t,
-                    new ArrayList<PigMacro>(seen.values()), macroStack);
+                    new ArrayList<PigMacro>(seen.values()), macroStack, pigContext);
             QueryParserUtils.replaceNodeWithNodeList(t, newTree, null);
         }
         
@@ -406,8 +430,10 @@ class PigMacro {
      *      1: list of return values
      *      2: list of parameters
      */
-    static CommonTree macroInline(CommonTree t, List<PigMacro> macroDefs, Set<String> macroStack)
-            throws ParserException {
+    static CommonTree macroInline(CommonTree t,
+                                  List<PigMacro> macroDefs, Set<String> macroStack,
+                                  PigContext pigContext)
+                                  throws ParserException {
         // get name
         String mn = t.getChild(0).getText();
 
@@ -440,6 +466,10 @@ class PigMacro {
         Set<String> newStack = new HashSet<String>(macroStack);
         newStack.add(macro.name);
         macro.setStack(newStack);
+
+        // inform the macro of the PigContext
+        // so it can substitute parameters from the main pigscript
+        macro.setPigContext(pigContext);
         
         // get return values
         int n = t.getChild(1).getChildCount();

Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Tue Aug  6 05:37:59 2013
@@ -66,6 +66,9 @@ NULL    : 'NULL'
 IMPORT  : 'IMPORT'
 ;
 
+REGISTER : 'REGISTER'
+;
+
 RETURNS : 'RETURNS'
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Tue Aug  6 05:37:59 2013
@@ -216,6 +216,7 @@ statement : SEMI_COLON!
           | inline_clause SEMI_COLON!
           | import_clause SEMI_COLON!
           | realias_clause SEMI_COLON!
+          | register_clause SEMI_COLON!
           // semicolons after foreach_complex_statement are optional for backwards compatibility, but to keep
           // the grammar unambiguous if there is one then we'll parse it as a single, standalone semicolon
           // (which matches the first statement rule)
@@ -346,6 +347,9 @@ explicit_type_cast : simple_type | expli
 import_clause : IMPORT^ QUOTEDSTRING
 ;
 
+register_clause : REGISTER^ QUOTEDSTRING (USING identifier_plus AS identifier_plus)?
+;
+
 define_clause : DEFINE^ IDENTIFIER ( cmd | func_clause | macro_clause)
 ;
 
@@ -956,6 +960,7 @@ nested_op_input_list : nested_op_input (
 // extended identifier, handling the keyword and identifier conflicts. Ugly but there is no other choice.
 eid_without_columns : rel_str_op
     | IMPORT
+    | REGISTER
     | RETURNS
     | DEFINE
     | LOAD

Modified: pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java Tue Aug  6 05:37:59 2013
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,6 +41,7 @@ import org.antlr.runtime.tree.Tree;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -58,8 +60,10 @@ public class QueryParserDriver {
     private static final String MACRO_DEF = "MACRO_DEF";
     private static final String MACRO_INLINE = "MACRO_INLINE";
     private static final String IMPORT_DEF = "import";
+    private static final String REGISTER_DEF = "register";
 
     private PigContext pigContext;
+    private PigServer pigServer;
     private String scope;
     private Map<String, String>fileNameMap;
     private Map<String, Operator> operators;
@@ -71,6 +75,7 @@ public class QueryParserDriver {
 
     public QueryParserDriver(PigContext pigContext, String scope, Map<String, String> fileNameMap) {
         this.pigContext = pigContext;
+        this.pigServer = null; // lazily instantiated for register statements
         this.scope = scope;
         this.fileNameMap = fileNameMap;
         importSeen = new HashSet<String>();
@@ -176,6 +181,8 @@ public class QueryParserDriver {
 
         try{
             ast = validateAst( ast );
+            applyRegisters(ast);
+
             LogicalPlanGenerator planGenerator =
                 new LogicalPlanGenerator( new CommonTreeNodeStream( ast ), pigContext, scope, fileNameMap );
             planGenerator.query();
@@ -287,7 +294,7 @@ public class QueryParserDriver {
             List<PigMacro> macroDefs) throws ParserException {
         for (CommonTree t : inlineNodes) {
             Set<String> macroStack = new HashSet<String>();
-            CommonTree newTree = PigMacro.macroInline(t, macroDefs, macroStack);
+            CommonTree newTree = PigMacro.macroInline(t, macroDefs, macroStack, pigContext);
 
             List<CommonTree> nodes = new ArrayList<CommonTree>();
             traverseInline(newTree, nodes);
@@ -300,6 +307,51 @@ public class QueryParserDriver {
         }
     }
 
+    private void applyRegisters(Tree t) throws ExecException, ParserException {
+        if (t.getText().equalsIgnoreCase(REGISTER_DEF)) {
+            String path = t.getChild(0).getText();
+            path = path.substring(1, path.length()-1);
+
+            if (path.endsWith(".jar")) {
+                if (t.getChildCount() != 1) {
+                    throw new ParserException("REGISTER statement refers to JAR but has a USING..AS scripting engine clause. " +
+                                              "Statement: " + t.toStringTree());
+                }
+
+                try {
+                    getPigServer().registerJar(path);
+                } catch (IOException ioe) {
+                    throw new ParserException(ioe.getMessage());
+                }
+            } else {
+                if (t.getChildCount() != 5) {
+                    throw new ParserException("REGISTER statement for non-JAR file requires a USING scripting_lang AS namespace clause. " +
+                                              "Ex. REGISTER 'my_file.py' USING jython AS my_jython_udfs;");
+                }
+
+                String scriptingLang = t.getChild(2).getText();
+                String namespace = t.getChild(4).getText();
+
+                try {
+                    getPigServer().registerCode(path, scriptingLang, namespace);
+                } catch (IOException ioe) {
+                    throw new ParserException(ioe.getMessage());
+                }
+            }
+        } else {
+            for (int i = 0; i < t.getChildCount(); i++) {
+                applyRegisters(t.getChild(i));
+            }
+        }
+    }
+
+    private PigServer getPigServer() throws ExecException {
+        if (pigServer == null) {
+            pigServer = new PigServer(pigContext, false);
+        }
+        return pigServer;
+    }
+
     private void traverseInline(Tree t, List<CommonTree> nodes) {
         if (t.getText().equals(MACRO_INLINE)) {
             nodes.add((CommonTree)t);
@@ -472,49 +524,63 @@ public class QueryParserDriver {
         String fname = t.getChild(0).getText();
         fname = QueryParserUtils.removeQuotes(fname);
         if (!importSeen.add(fname)) {
-            String msg = getErrorMessage(fname, t,
-                    ": Duplicated import file '" + fname + "'", null);
-            LOG.warn(msg);
-            // Disabling this exception, see https://issues.apache.org/jira/browse/PIG-2279
-            // throw new ParserException(msg);
+            // we've already imported this file, so just skip this import statement
+            LOG.debug("Ignoring duplicated import " + fname);
+            t.getParent().deleteChild(t.getChildIndex());
+            return;
         }
 
-        FetchFileRet localFileRet = getMacroFile(fname);
+        Tree macroAST = null;
+        if (pigContext.macros.containsKey(fname)) {
+            macroAST = pigContext.macros.get(fname);
+        } else {
+            FetchFileRet localFileRet = getMacroFile(fname);
 
-        BufferedReader in = null;
-        try {
-            in = new BufferedReader(new FileReader(localFileRet.file));
-        } catch (FileNotFoundException e) {
-            String msg = getErrorMessage(fname, t,
-                    "Failed to import file '" + fname + "'", e.getMessage());
-            throw new ParserException(msg);
-        }
+            BufferedReader in = null;
+            try {
+                in = new BufferedReader(new FileReader(localFileRet.file));
+            } catch (FileNotFoundException e) {
+                String msg = getErrorMessage(fname, t,
+                        "Failed to import file '" + fname + "'", e.getMessage());
+                throw new ParserException(msg);
+            }
 
-        StringBuilder sb = new StringBuilder();
-        String line = null;
-        try {
-            line = in.readLine();
-            while (line != null) {
-                sb.append(line).append("\n");
+            StringBuilder sb = new StringBuilder();
+            String line = null;
+            try {
                 line = in.readLine();
+                while (line != null) {
+                    sb.append(line).append("\n");
+                    line = in.readLine();
+                }
+            } catch (IOException e) {
+                String msg = getErrorMessage(fname, t,
+                        "Failed to read file '" + fname + "'", e.getMessage());
+                throw new ParserException(msg);
             }
-        } catch (IOException e) {
-            String msg = getErrorMessage(fname, t,
-                    "Failed to read file '" + fname + "'", e.getMessage());
-            throw new ParserException(msg);
-        }
 
-        // parse
-        CommonTokenStream tokenStream = tokenize(sb.toString(), fname);
+            String macroText = null;
+            try {
+                in = new BufferedReader(new StringReader(sb.toString()));
+                macroText = pigContext.doParamSubstitution(in);
+            } catch (IOException e) {
+                String msg = getErrorMessage(fname, t,
+                    "Parameter sustitution failed for macro.", e.getMessage());
+                throw new ParserException(msg);
+            }
 
-        Tree ast = null;
-        try {
-            ast = parse( tokenStream );
-        } catch(RuntimeException ex) {
-            throw new ParserException( ex.getMessage() );
+            // parse
+            CommonTokenStream tokenStream = tokenize(macroText, fname);
+
+            try {
+                macroAST = parse( tokenStream );
+                pigContext.macros.put(fname, macroAST);
+            } catch(RuntimeException ex) {
+                throw new ParserException( ex.getMessage() );
+            }
         }
 
-        QueryParserUtils.replaceNodeWithNodeList(t, (CommonTree)ast, fname);
+        QueryParserUtils.replaceNodeWithNodeList(t, (CommonTree)macroAST, fname);
     }
 
     private String getErrorMessage(String importFile,

Modified: pig/trunk/src/org/apache/pig/scripting/Pig.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/Pig.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/Pig.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/Pig.java Tue Aug  6 05:37:59 2013
@@ -23,9 +23,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.LineNumberReader;
 import java.io.StringReader;
-import java.io.StringWriter;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -35,14 +33,14 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FsShell;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.grunt.GruntParser;
-import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 
 /**
  * The class being used in scripts to interact with Pig
  */
 public class Pig {
-    
+
     private static final Log LOG = LogFactory.getLog(Pig.class);
 
     private static List<String> defineCache = new ArrayList<String>();
@@ -62,7 +60,7 @@ public class Pig {
                 .getPigContext().getProperties()));
         int code = -1;
         if (cmd != null) {
-            String[] cmdTokens = cmd.split("\\s+");         
+            String[] cmdTokens = cmd.split("\\s+");
             if (!cmdTokens[0].startsWith("-")) cmdTokens[0] = "-" + cmdTokens[0];
             try {
                 code = shell.run(cmdTokens);
@@ -72,7 +70,7 @@ public class Pig {
         }
         return code;
     }
-    
+
     /**
      * Run a sql command.  Any output from this command is written to
      * stdout or stderr as appropriate.
@@ -98,8 +96,8 @@ public class Pig {
 
     /**
      * Register a jar for use in Pig.  Once this is done this jar will be
-     * registered for <b>all subsequent</b> Pig pipelines in this script.  
-     * If you wish to register it for only a single Pig pipeline, use 
+     * registered for <b>all subsequent</b> Pig pipelines in this script.
+     * If you wish to register it for only a single Pig pipeline, use
      * register within that definition.
      * @param jarfile Path of jar to include.
      * @throws IOException if the indicated jarfile cannot be found.
@@ -113,8 +111,8 @@ public class Pig {
 
     /**
      * Register scripting UDFs for use in Pig. Once this is done all UDFs
-     * defined in the file will be available for <b>all subsequent</b> 
-     * Pig pipelines in this script. If you wish to register UDFS for 
+     * defined in the file will be available for <b>all subsequent</b>
+     * Pig pipelines in this script. If you wish to register UDFS for
      * only a single Pig pipeline, use register within that definition.
      * @param udffile Path of the script UDF file
      * @param namespace namespace of the UDFs
@@ -125,16 +123,16 @@ public class Pig {
         LOG.info("Register script UFD file: "+ udffile);
         ScriptPigContext ctx = getScriptContext();
         ScriptEngine engine = ctx.getScriptEngine();
-        // script file contains only functions, no need to separate 
+        // script file contains only functions, no need to separate
         // functions from control flow code
         if (namespace != null && namespace.isEmpty()) namespace = null;
         engine.registerFunctions(udffile, namespace, ctx.getPigContext());
-        addRegisterScriptUDFClause(udffile, namespace);        
+        addRegisterScriptUDFClause(udffile, namespace);
     }
 
     /**
      * Define an alias for a UDF or a streaming command.  This definition
-     * will then be present for <b>all subsequent</b> Pig pipelines defined in this 
+     * will then be present for <b>all subsequent</b> Pig pipelines defined in this
      * script.  If you wish to define it for only a single Pig pipeline, use
      * define within that definition.
      * @param alias name of the defined alias
@@ -148,7 +146,7 @@ public class Pig {
 
     /**
      * Set a variable for use in Pig Latin.  This set
-     * will then be present for <b>all subsequent</b> Pig pipelines defined in this 
+     * will then be present for <b>all subsequent</b> Pig pipelines defined in this
      * script.  If you wish to set it for only a single Pig pipeline, use
      * set within that definition.
      * @param var variable to set
@@ -159,14 +157,14 @@ public class Pig {
         PigServer pigServer = new PigServer(ctx.getPigContext(), false);
         pigServer.getPigContext().getProperties().setProperty(var, value);
     }
-            
+
     /**
-     * Define a Pig pipeline.  
+     * Define a Pig pipeline.
      * @param pl Pig Latin definition of the pipeline.
      * @return Pig object representing this pipeline.
      * @throws IOException if the Pig Latin does not compile.
      */
-    public static Pig compile(String pl) throws IOException { 
+    public static Pig compile(String pl) throws IOException {
         return compile(null, pl);
     }
 
@@ -189,7 +187,7 @@ public class Pig {
 
     /**
      * Define a Pig pipeline based on Pig Latin in a separate file.
-     * @param filename File to read Pig Latin from.  This must be a purely 
+     * @param filename File to read Pig Latin from.  This must be a purely
      * Pig Latin file.  It cannot contain host language constructs in it.
      * @return Pig object representing this pipeline.
      * @throws IOException if the Pig Latin does not compile or the file
@@ -205,7 +203,7 @@ public class Pig {
      * This allows it to be imported into another pipeline.
      * @param name Name that will be used to define this pipeline.
      * The namespace is global.
-     * @param filename File to read Pig Latin from.  This must be a purely 
+     * @param filename File to read Pig Latin from.  This must be a purely
      * Pig Latin file.  It cannot contain host language constructs in it.
      * @return Pig object representing this pipeline.
      * @throws IOException if the Pig Latin does not compile or the file
@@ -217,31 +215,31 @@ public class Pig {
     }
 
     //-------------------------------------------------------------------------
-   
+
     /**
      * Bind this to a set of variables. Values must be provided
      * for all Pig Latin parameters.
-     * @param vars map of variables to bind.  Keys should be parameters defined 
+     * @param vars map of variables to bind.  Keys should be parameters defined
      * in the Pig Latin.  Values should be strings that provide values for those
      * parameters.  They can be either constants or variables from the host
      * language.  Host language variables must contain strings.
-     * @return a {@link BoundScript} object 
+     * @return a {@link BoundScript} object
      * @throws IOException if there is not a key for each
      * Pig Latin parameter or if they contain unsupported types.
      */
     public BoundScript bind(Map<String, Object> vars) throws IOException {
         return new BoundScript(replaceParameters(script, vars), scriptContext, name);
     }
-        
+
     /**
-     * Bind this to multiple sets of variables.  This will 
-     * cause the Pig Latin script to be executed in parallel over these sets of 
+     * Bind this to multiple sets of variables.  This will
+     * cause the Pig Latin script to be executed in parallel over these sets of
      * variables.
-     * @param vars list of maps of variables to bind.  Keys should be parameters defined 
+     * @param vars list of maps of variables to bind.  Keys should be parameters defined
      * in the Pig Latin.  Values should be strings that provide values for those
      * variables.  They can be either constants or variables from the host
      * language.  Host language variables must be strings.
-     * @return a {@link BoundScript} object 
+     * @return a {@link BoundScript} object
      * @throws IOException  if there is not a key for each
      * Pig Latin parameter or if they contain unsupported types.
      */
@@ -261,7 +259,7 @@ public class Pig {
      * <tt> p = Pig.compile("A = load '$input';");</tt>
      * and then calls this function it will look for a variable called
      * <tt>input</tt> in the host language.  Scoping rules of the host
-     * language will be followed in selecting which variable to bind.  The 
+     * language will be followed in selecting which variable to bind.  The
      * variable bound must contain a string value.  This method is optional
      * because not all host languages may support searching for in scope
      * variables.
@@ -277,21 +275,21 @@ public class Pig {
         Map<String, Object> vars = engine.getParamsFromVariables();
         return bind(vars);
     }
-    
+
     //-------------------------------------------------------------------------
-        
+
     private String script = null;
 
     private ScriptPigContext scriptContext = null;
 
     private String name = null;
-    
+
     protected Pig(String script, ScriptPigContext scriptContext, String name) {
         this.script = script;
         this.scriptContext = scriptContext;
-        this.name = name;       
+        this.name = name;
     }
-    
+
     /**
      * Replaces the $<identifier> with their actual values
      * @param qstr the pig script to rewrite
@@ -300,45 +298,36 @@ public class Pig {
      */
     private String replaceParameters(String qstr, Map<String, Object> vars)
             throws IOException {
-        ArrayList<String> plist = new ArrayList<String>();
+
+        List<String> params = new ArrayList<String>();
         for (Entry<String, Object> entry : vars.entrySet()) {
-            plist.add(entry.getKey() + "="
+            params.add(entry.getKey() + "="
                     + fixNonEscapedDollarSign(entry.getValue().toString()));
         }
-        if (getScriptContext().getPigContext().getParams()!=null) {
-            for (String param : getScriptContext().getPigContext().getParams()) {
-                plist.add(param);
+
+        PigContext context = getScriptContext().getPigContext();
+        List<String> contextParams = context.getParams();
+        if (contextParams != null) {
+            for (String param : contextParams) {
+                params.add(param);
             }
         }
-        
-        ParameterSubstitutionPreprocessor psp = 
-            new ParameterSubstitutionPreprocessor(50);
-        
-        String[] params = new String[1];
-        
-        StringWriter writer = new StringWriter();
-        BufferedReader in = new BufferedReader(new StringReader(qstr));
-        String[] type1 = new String[1];
-        try {
-            psp.genSubstitutedFile(in, writer, plist.toArray(params), 
-                    scriptContext.getPigContext().getParamFiles()!=null && 
-                    scriptContext.getPigContext().getParamFiles().size() > 0 ? 
-                    scriptContext.getPigContext().getParamFiles().toArray(type1) : null);
-        } catch (org.apache.pig.tools.parameters.ParseException e) {
-            throw new IOException("Param substitution failed", e);            
-        } 
-        return writer.toString();
+
+        BufferedReader reader = new BufferedReader(new StringReader(qstr));
+        String substituted =  context.doParamSubstitution(reader, params, context.getParamFiles());
+        context.setParams(contextParams); // reset params that were originally in PigContext
+        return substituted;
     }
-    
-    // Escape the $ so that we can use the parameter substitution 
+
+    // Escape the $ so that we can use the parameter substitution
     // to perform bind operation. Parameter substitution will un-escape $
     private static String fixNonEscapedDollarSign(String s) {
         String[] tkns = s.split("\\$", -1);
 
         if (tkns.length == 1) return s;
-        
+
         StringBuilder sb = new StringBuilder();
-        
+
         for (int i = 0; i < tkns.length -1; i++) {
             if (tkns[i].isEmpty()) {
                 sb.append("\\\\");
@@ -354,7 +343,7 @@ public class Pig {
 
         return sb.toString();
     }
-    
+
     //-------------------------------------------------------------------------
 
     private static String getScriptFromFile(String filename) throws IOException {
@@ -410,5 +399,5 @@ public class Pig {
         return ctx;
     }
 
-    
+
 }

Modified: pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java (original)
+++ pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java Tue Aug  6 05:37:59 2013
@@ -103,7 +103,7 @@ public class ToolsPigServer extends PigS
         FileInputStream fis = null;
         try{
             fis = new FileInputStream(fileName);
-            substituted = doParamSubstitution(fis, params, paramFiles);
+            substituted = pigContext.doParamSubstitution(fis, paramMapToList(params), paramFiles);
         }catch (FileNotFoundException e){
             log.error(e.getLocalizedMessage());
             throw new IOException(e.getCause());
@@ -151,13 +151,13 @@ public class ToolsPigServer extends PigS
      */
     public List<ExecJob> runPlan(LogicalPlan newPlan,
                                  String jobName) throws FrontendException, ExecException {
-    	
+
         HExecutionEngine engine = new HExecutionEngine(pigContext);
         PhysicalPlan pp = engine.compile(newPlan, null);
         PigStats stats = launchPlan(pp, jobName);
-        return getJobs(stats);                        
+        return getJobs(stats);
     }
-            
+
     public static class PigPlans {
 
         public LogicalPlan lp;

Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Tue Aug  6 05:37:59 2013
@@ -30,7 +30,6 @@ import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.io.Reader;
 import java.io.StringReader;
-import java.io.StringWriter;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -65,11 +64,11 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.TupleFormat;
-import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.pigscript.parser.ParseException;
 import org.apache.pig.tools.pigscript.parser.PigScriptParser;
 import org.apache.pig.tools.pigscript.parser.PigScriptParserTokenManager;
@@ -457,23 +456,12 @@ public class GruntParser extends PigScri
         }
     }
 
-    private String runPreprocessor(String script, List<String> params,
-                                   List<String> files)
+    private String runPreprocessor(String scriptPath, List<String> params, List<String> paramFiles)
         throws IOException, ParseException {
 
-        ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
-        StringWriter writer = new StringWriter();
-
-        try{
-            psp.genSubstitutedFile(new BufferedReader(new FileReader(script)),
-                                   writer,
-                                   params.size() > 0 ? params.toArray(new String[0]) : null,
-                                   files.size() > 0 ? files.toArray(new String[0]) : null);
-        } catch (org.apache.pig.tools.parameters.ParseException pex) {
-            throw new ParseException(pex.getMessage());
-        }
-
-        return writer.toString();
+        PigContext context = mPigServer.getPigContext();
+        BufferedReader reader = new BufferedReader(new FileReader(scriptPath));
+        return context.doParamSubstitution(reader, params, paramFiles);
     }
 
     @Override
@@ -512,6 +500,9 @@ public class GruntParser extends PigScri
         ConsoleReader reader;
         boolean interactive;
 
+        mPigServer.getPigContext().setParams(params);
+        mPigServer.getPigContext().setParamFiles(files);
+
         try {
             FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script);
             String cmds = runPreprocessor(fetchFile.file.getAbsolutePath(), params, files);

Modified: pig/trunk/src/org/apache/pig/tools/parameters/ParameterSubstitutionPreprocessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/parameters/ParameterSubstitutionPreprocessor.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/parameters/ParameterSubstitutionPreprocessor.java (original)
+++ pig/trunk/src/org/apache/pig/tools/parameters/ParameterSubstitutionPreprocessor.java Tue Aug  6 05:37:59 2013
@@ -24,22 +24,17 @@ package org.apache.pig.tools.parameters;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Logger;
 
-import java.util.Hashtable;
-import java.io.FileInputStream;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.BufferedReader;
 import java.io.StringReader;
 import java.io.Writer;
+import java.util.Arrays;
 
 public class ParameterSubstitutionPreprocessor {
 
     private PreprocessorContext pc;
     private final Log log = LogFactory.getLog(getClass());
-    private ParamLoader paramParser;
     private PigFileParser pigParser;
 
     /**
@@ -47,11 +42,12 @@ public class ParameterSubstitutionPrepro
      * would not cause incorrect behavior but would impact performance
      */
     public ParameterSubstitutionPreprocessor(int limit) {
-        pc = new PreprocessorContext(50);
-        StringReader sr = null;
+        this(new PreprocessorContext(limit));
+    }
 
-        paramParser  = new ParamLoader(sr);
-        paramParser.setContext(pc);
+    public ParameterSubstitutionPreprocessor(PreprocessorContext pc) {
+        this.pc = pc;
+        StringReader sr = null;
         pigParser = new PigFileParser(sr);
         pigParser.setContext(pc);
     }
@@ -60,27 +56,9 @@ public class ParameterSubstitutionPrepro
      * This is the main API that takes script template and produces pig script 
      * @param pigInput - input stream that contains pig file
      * @param pigOutput - stream where transformed file is written
-     * @param args - command line arguments in the order they appear on the command line; format: key=val
-     * @param argFiles - list of configuration files in the order they appear on the command line
+     * @param paramVal - map of parameter names to values
      */
-    public void genSubstitutedFile (BufferedReader pigInput, Writer pigOutput, String[] args, String[] argFiles) throws ParseException {
-
-        // load parameters from the files followed by parameters
-        // from command line both in the order they appear on the command line
-        // this enforces precedence rules 
-        if (argFiles!=null) {
-            for (int i=0;i<argFiles.length;i++) {
-                if (argFiles[i].length() > 0)
-                    loadParamsFromFile(argFiles[i]);
-            }
-        }
-        if (args!=null) {
-            for (int i=0;i<args.length;i++) {
-                if (args[i].length() > 0)
-                    loadParamsFromCmdline(args[i]);
-            }
-        }
-
+    public void genSubstitutedFile(BufferedReader pigInput, Writer pigOutput) throws ParseException {
         // In case there is no EOL before EOF, add EOL for each line
         String line = null;
         StringBuilder blder = new StringBuilder();       
@@ -93,11 +71,23 @@ public class ParameterSubstitutionPrepro
         }
                 
         pigInput = new BufferedReader(new StringReader(blder.toString()));
-        
+
         // perform the substitution
         parsePigFile(pigInput , pigOutput);
     }
 
+    // Kept for compatibility with old interface
+    public void genSubstitutedFile(BufferedReader pigInput, Writer pigOutput,
+                                   String[] params, String[] paramFiles) throws ParseException {
+        try {
+            pc.loadParamVal(params == null ? null : Arrays.asList(params),
+                            paramFiles == null ? null : Arrays.asList(paramFiles));
+            genSubstitutedFile(pigInput, pigOutput);
+        } catch (IOException e) {
+            throw new ParseException(e.getMessage());
+        }
+    }
+
     private void parsePigFile(BufferedReader in, Writer out) throws ParseException {
         pigParser.setOutputWriter(out);
         pigParser.ReInit(in);
@@ -112,52 +102,4 @@ public class ParameterSubstitutionPrepro
             throw rte;
         }
     }
-
-    /* 
-     * populates the param-val hashtable with parameters read from a file
-     * @param filename - name of the config file
-     */
-    private void loadParamsFromFile(String filename) throws ParseException {
-
-        try {
-            BufferedReader in = new BufferedReader(new FileReader(filename));
-            String line;
- 
-            paramParser.ReInit(in);
-            while (paramParser.Parse()) {}
-            in.close();
-        }catch(org.apache.pig.tools.parameters.ParseException e){
-        	log.info("The file: \""+filename+"\" contains parameter that cannot be parsed by Pig in line. Please double check it");
-        	log.info("Parser give the follow error message:");
-        	log.info(e.getMessage());
-        	throw e;
-        } catch (IOException e) {
-            RuntimeException rte = new RuntimeException(e.getMessage() , e);
-            throw rte;
-        }
-
-    }
-
-    /*
-     * adds key-val pairs from cmd line to the param-val hashtable 
-     * @param param contains key-val of the form key='value'
-     */
-    private void loadParamsFromCmdline(String line) throws ParseException {
-        try {
-            // new lines are needed by the parser
-            paramParser.ReInit(new StringReader(line));
-            paramParser.Parse();
-        } catch(org.apache.pig.tools.parameters.ParseException e){
-        	log.info("The parameter: \""+line+"\" cannot be parsed by Pig. Please double check it");
-        	log.info("Parser give the follow error message:");
-        	log.info(e.getMessage());
-        	throw e;
-        }catch (IOException e) {
-            RuntimeException rte = new RuntimeException(e.getMessage() , e);
-            throw rte;
-        }
-
-    }
-
-
 }

Modified: pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original)
+++ pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java Tue Aug  6 05:37:59 2013
@@ -23,9 +23,13 @@
 package org.apache.pig.tools.parameters;
 
 import java.io.BufferedReader;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringReader;
 import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -34,7 +38,15 @@ import org.apache.commons.logging.LogFac
 
 public class PreprocessorContext {
 
-    private Hashtable<String , String> param_val ;
+    private Map<String, String> param_val;
+
+    // used internally to detect when a param is set multiple times,
+    // but it set with the same value so it's ok not to log a warning
+    private Map<String, String> param_source;
+
+    public Map<String, String> getParamVal() {
+        return param_val;
+    }
 
     private final Log log = LogFactory.getLog(getClass());
 
@@ -42,8 +54,14 @@ public class PreprocessorContext {
      * @param limit - max number of parameters. Passing
      *                smaller number only impacts performance
      */
-    public PreprocessorContext(int limit){
+    public PreprocessorContext(int limit) {
         param_val = new Hashtable<String, String> (limit);
+        param_source = new Hashtable<String, String> (limit);
+    }
+
+    public PreprocessorContext(Map<String, String> paramVal) {
+        param_val = paramVal;
+        param_source = new Hashtable<String, String>(paramVal);
     }
 
     /*
@@ -96,14 +114,16 @@ public class PreprocessorContext {
     public  void processShellCmd(String key, String val, Boolean overwrite) {
 
         if (param_val.containsKey(key)) {
-            if (overwrite) {
-                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
-            } else {
+            if (param_source.get(key).equals(val) || !overwrite) {
                 return;
+            } else {
+                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
             }
         }
 
-        val=val.substring(1, val.length()-1); //to remove the backticks
+        param_source.put(key, val);
+
+        val = val.substring(1, val.length()-1); //to remove the backticks
         String sub_val = substitute(val);
         sub_val = executeShellCommand(sub_val);
         param_val.put(key, sub_val);
@@ -120,13 +140,15 @@ public class PreprocessorContext {
     public  void processOrdLine(String key, String val, Boolean overwrite) {
 
         if (param_val.containsKey(key)) {
-            if (overwrite) {
-                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
-            } else {
+            if (param_source.get(key).equals(val) || !overwrite) {
                 return;
+            } else {
+                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
             }
         }
 
+        param_source.put(key, val);
+
         String sub_val = substitute(val);
         param_val.put(key, sub_val);
     }
@@ -210,6 +232,29 @@ public class PreprocessorContext {
         return streamData.trim();
     }
 
+    public void loadParamVal(List<String> params, List<String> paramFiles)
+                throws IOException, ParseException {
+        StringReader dummyReader = null; // ParamLoader does not have an empty contructor
+        ParamLoader paramLoader = new ParamLoader(dummyReader);
+        paramLoader.setContext(this);
+
+        if (paramFiles != null) {
+            for (String path : paramFiles) {
+                BufferedReader in = new BufferedReader(new FileReader(path));
+                paramLoader.ReInit(in);
+                while (paramLoader.Parse()) {}
+                in.close();
+            }
+        }
+        
+        if (params != null) {
+            for (String param : params) {
+                paramLoader.ReInit(new StringReader(param));
+                paramLoader.Parse();
+            }
+        }
+    }
+
     private Pattern bracketIdPattern = Pattern.compile("\\$\\{([_]*[a-zA-Z][a-zA-Z_0-9]*)\\}");
     private Pattern id_pattern = Pattern.compile("\\$([_]*[a-zA-Z][a-zA-Z_0-9]*)");
 

Modified: pig/trunk/test/org/apache/pig/pigunit/PigTest.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/pigunit/PigTest.java?rev=1510858&r1=1510857&r2=1510858&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/pigunit/PigTest.java (original)
+++ pig/trunk/test/org/apache/pig/pigunit/PigTest.java Tue Aug  6 05:37:59 2013
@@ -18,10 +18,10 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringReader;
-import java.io.StringWriter;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -34,10 +34,10 @@ import org.apache.pig.ExecType;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.pigunit.pig.PigServer;
-import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.parameters.ParseException;
 
 /**
@@ -153,13 +153,12 @@ public class PigTest {
   protected void registerScript() throws IOException, ParseException {
     getCluster();
 
-    BufferedReader pigIStream = new BufferedReader(new StringReader(this.originalTextPigScript));
-    StringWriter pigOStream = new StringWriter();
+    BufferedReader reader = new BufferedReader(new StringReader(this.originalTextPigScript));
+    PigContext context = getPigServer().getPigContext();
 
-    ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
-    ps.genSubstitutedFile(pigIStream, pigOStream, args, argFiles);
-
-    String substitutedPig = pigOStream.toString();
+    String substitutedPig = context.doParamSubstitution(reader,
+                                                        args == null ? null : Arrays.asList(args),
+                                                        argFiles == null ? null : Arrays.asList(argFiles));
     LOG.info(substitutedPig);
 
     File f = File.createTempFile("tmp", "pigunit");