You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/01/08 22:13:48 UTC

svn commit: r1056801 - in /pig/trunk: CHANGES.txt src/org/apache/pig/PigServer.java test/org/apache/pig/test/TestPigServer.java

Author: dvryaboy
Date: Sat Jan  8 21:13:46 2011
New Revision: 1056801

URL: http://svn.apache.org/viewvc?rev=1056801&view=rev
Log:
PIG-1675: allow PigServer to register pig script from InputStream

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/test/org/apache/pig/test/TestPigServer.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1056801&r1=1056800&r2=1056801&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jan  8 21:13:46 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1675: allow PigServer to register pig script from InputStream (zjffdu via dvryaboy)
+
 PIG-1479: Embed Pig in scripting languages (rding)
 
 PIG-946: Combiner optimizer does not optimize when limit follow group, foreach (thejas)

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1056801&r1=1056800&r2=1056801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Sat Jan  8 21:13:46 2011
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.io.StringReader;
@@ -116,20 +117,20 @@ import org.apache.pig.tools.pigstats.Pig
 
 
 /**
- * 
+ *
  * A class for Java programs to connect to Pig. Typically a program will create a PigServer
  * instance. The programmer then registers queries using registerQuery() and
  * retrieves results using openIterator() or store(). After doing so, the
  * shutdown() method should be called to free any resources used by the current
  * PigServer instance. Not doing so could result in a memory leak.
- * 
+ *
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class PigServer {
-    
+
     private final Log log = LogFactory.getLog(getClass());
-    
+
     /**
      * Given a string, determine the exec type.
      * @param str accepted values are 'local', 'mapreduce', and 'mapred'
@@ -137,44 +138,54 @@ public class PigServer {
      */
     public static ExecType parseExecType(String str) throws IOException {
         String normStr = str.toLowerCase();
-        
-        if (normStr.equals("local")) return ExecType.LOCAL;
-        if (normStr.equals("mapreduce")) return ExecType.MAPREDUCE;
-        if (normStr.equals("mapred")) return ExecType.MAPREDUCE;
-        if (normStr.equals("pig")) return ExecType.PIG;
-        if (normStr.equals("pigbody")) return ExecType.PIG;
-   
+
+        if (normStr.equals("local")) {
+            return ExecType.LOCAL;
+        }
+        if (normStr.equals("mapreduce")) {
+            return ExecType.MAPREDUCE;
+        }
+        if (normStr.equals("mapred")) {
+            return ExecType.MAPREDUCE;
+        }
+        if (normStr.equals("pig")) {
+            return ExecType.PIG;
+        }
+        if (normStr.equals("pigbody")) {
+            return ExecType.PIG;
+        }
+
         int errCode = 2040;
         String msg = "Unknown exec type: " + str;
         throw new PigException(msg, errCode, PigException.BUG);
     }
 
     /*
-     * The data structure to support grunt shell operations. 
-     * The grunt shell can only work on one graph at a time. 
+     * The data structure to support grunt shell operations.
+     * The grunt shell can only work on one graph at a time.
      * If a script is contained inside another script, the grunt
-     * shell first saves the current graph on the stack and works 
-     * on a new graph. After the nested script is done, the grunt 
+     * shell first saves the current graph on the stack and works
+     * on a new graph. After the nested script is done, the grunt
      * shell pops up the saved graph and continues working on it.
      */
-    private Stack<Graph> graphs = new Stack<Graph>();
-    
+    private final Stack<Graph> graphs = new Stack<Graph>();
+
     /*
      * The current Graph the grunt shell is working on.
      */
     private Graph currDAG;
- 
-    private PigContext pigContext;
-    
+
+    private final PigContext pigContext;
+
     private static int scopeCounter = 0;
-    private String scope = constructScope();
+    private final String scope = constructScope();
 
     private boolean aggregateWarning = true;
     private boolean isMultiQuery = true;
-    
+
     private String constructScope() {
         // scope servers for now as a session id
-        
+
         // String user = System.getProperty("user.name", "DEFAULT_USER_ID");
         // String date = (new Date()).toString();
 
@@ -185,9 +196,9 @@ public class PigServer {
         // operators to not include scope in their name().
         return ""+(++scopeCounter);
     }
-    
+
     /**
-     * @param execTypeString can be 'mapreduce' or 'local'.  Local mode will 
+     * @param execTypeString can be 'mapreduce' or 'local'.  Local mode will
      * use Hadoop's local job runner to execute the job on the local machine.
      * Mapreduce mode will connect to a cluster to execute the job.
      * @throws ExecException
@@ -196,9 +207,9 @@ public class PigServer {
     public PigServer(String execTypeString) throws ExecException, IOException {
         this(parseExecType(execTypeString));
     }
-    
+
     /**
-     * @param execType execution type to start the engine.  Local mode will 
+     * @param execType execution type to start the engine.  Local mode will
      * use Hadoop's local job runner to execute the job on the local machine.
      * Mapreduce mode will connect to a cluster to execute the job.
      * @throws ExecException
@@ -210,25 +221,25 @@ public class PigServer {
     public PigServer(ExecType execType, Properties properties) throws ExecException {
         this(new PigContext(execType, properties));
     }
-  
+
     public PigServer(PigContext context) throws ExecException {
         this(context, true);
     }
-    
+
     public PigServer(PigContext context, boolean connect) throws ExecException {
         this.pigContext = context;
         currDAG = new Graph(false);
-        
+
         aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
         isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
 
         if (connect) {
             pigContext.connect();
         }
-        
+
         addJarsFromProperties();
     }
-    
+
     private void addJarsFromProperties() throws ExecException {
         //add jars from properties to extraJars
         String jar_str = pigContext.getProperties().getProperty("pig.additional.jars");
@@ -238,7 +249,7 @@ public class PigServer {
                     registerJar(jar);
                 } catch (IOException e) {
                     int errCode = 4010;
-                    String msg = 
+                    String msg =
                         "Failed to register jar :" + jar + ". Caught exception.";
                     throw new ExecException(
                             msg,
@@ -254,7 +265,7 @@ public class PigServer {
     public PigContext getPigContext(){
         return pigContext;
     }
-    
+
     /**
      * Set the logging level to DEBUG.
      */
@@ -262,7 +273,7 @@ public class PigServer {
         Logger.getLogger("org.apache.pig").setLevel(Level.DEBUG);
         pigContext.getLog4jProperties().setProperty("log4j.logger.org.apache.pig", Level.DEBUG.toString());
     }
-    
+
     /**
      * Set the logging level to the default.
      */
@@ -270,7 +281,7 @@ public class PigServer {
         Logger.getLogger("org.apache.pig").setLevel(pigContext.getDefaultLogLevel());
         pigContext.getLog4jProperties().setProperty("log4j.logger.org.apache.pig", pigContext.getDefaultLogLevel().toString());
     }
-    
+
     /**
      * Set the default parallelism for this job
      * @param p default number of reducers to use for this job.
@@ -278,13 +289,13 @@ public class PigServer {
     public void setDefaultParallel(int p) {
         pigContext.defaultParallel = p;
     }
- 
+
     /**
      * Starts batch execution mode.
      */
     public void setBatchOn() {
         log.debug("Create a new graph.");
-        
+
         if (currDAG != null) {
             graphs.push(currDAG);
         }
@@ -293,7 +304,7 @@ public class PigServer {
 
     /**
      * Retrieve the current execution mode.
-     * 
+     *
      * @return true if the execution mode is batch; false otherwise.
      */
     public boolean isBatchOn() {
@@ -320,8 +331,8 @@ public class PigServer {
     }
 
     /**
-     * Submits a batch of Pig commands for execution. 
-     * 
+     * Submits a batch of Pig commands for execution.
+     *
      * @return list of jobs being executed
      * @throws FrontendException
      * @throws ExecException
@@ -334,7 +345,7 @@ public class PigServer {
         while (iter.hasNext()) {
             JobStats js = iter.next();
             for (OutputStats output : js.getOutputs()) {
-                if (js.isSuccessful()) {                
+                if (js.isSuccessful()) {
                     jobs.add(new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, output
                             .getPOStore(), output.getAlias(), stats));
                 } else {
@@ -359,13 +370,13 @@ public class PigServer {
             String msg = "setBatchOn() must be called first.";
             throw new FrontendException(msg, errCode, PigException.INPUT);
         }
-        
+
         return currDAG.execute();
     }
-    
+
     /**
      * Discards a batch of Pig commands.
-     * 
+     *
      * @throws FrontendException
      */
     public void discardBatch() throws FrontendException {
@@ -374,50 +385,51 @@ public class PigServer {
             String msg = "setBatchOn() must be called first.";
             throw new FrontendException(msg, errCode, PigException.INPUT);
         }
-        
+
         currDAG = graphs.pop();
     }
-       
+
     /**
-     * Add a path to be skipped while automatically shipping binaries for 
+     * Add a path to be skipped while automatically shipping binaries for
      * streaming.
-     *  
+     *
      * @param path path to be skipped
      */
     public void addPathToSkip(String path) {
         pigContext.addPathToSkip(path);
     }
-    
+
     /**
      * Defines an alias for the given function spec. This
-     * is useful for functions that require arguments to the 
+     * is useful for functions that require arguments to the
      * constructor.
-     * 
+     *
      * @param function - the new function alias to define.
      * @param functionSpec - the name of the function and any arguments.
      * It should have the form: classname('arg1', 'arg2', ...)
      * @deprecated Use {@link #registerFunction(String, FuncSpec)}
      */
+    @Deprecated
     public void registerFunction(String function, String functionSpec) {
         registerFunction(function, new FuncSpec(functionSpec));
     }
-    
+
     /**
      * Defines an alias for the given function spec. This
-     * is useful for functions that require arguments to the 
+     * is useful for functions that require arguments to the
      * constructor.
-     * 
+     *
      * @param function - the new function alias to define.
-     * @param funcSpec - the FuncSpec object representing the name of 
+     * @param funcSpec - the FuncSpec object representing the name of
      * the function class and any arguments to constructor.
      */
     public void registerFunction(String function, FuncSpec funcSpec) {
         pigContext.registerFunction(function, funcSpec);
     }
-    
+
     /**
      * Defines an alias for the given streaming command.
-     * 
+     *
      * @param commandAlias - the new command alias to define
      * @param command - streaming command to be executed
      */
@@ -428,66 +440,66 @@ public class PigServer {
     private URL locateJarFromResources(String jarName) throws IOException {
         Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
         URL resourceLocation = null;
-        
+
         if (urls.hasMoreElements()) {
             resourceLocation = urls.nextElement();
         }
-        
+
         if (urls.hasMoreElements()) {
             StringBuffer sb = new StringBuffer("Found multiple resources that match ");
             sb.append(jarName);
             sb.append(": ");
             sb.append(resourceLocation);
-            
+
             while (urls.hasMoreElements()) {
                 sb.append(urls.nextElement());
                 sb.append("; ");
             }
-            
+
             log.debug(sb.toString());
         }
-    
+
         return resourceLocation;
     }
-    
+
     /**
-     * Registers a jar file. Name of the jar file can be an absolute or 
+     * Registers a jar file. Name of the jar file can be an absolute or
      * relative path.
-     * 
+     *
      * If multiple resources are found with the specified name, the
      * first one is registered as returned by getSystemResources.
      * A warning is issued to inform the user.
-     * 
+     *
      * @param name of the jar file to register
      * @throws IOException
      */
     public void registerJar(String name) throws IOException {
         // first try to locate jar via system resources
-        // if this fails, try by using "name" as File (this preserves 
-        // compatibility with case when user passes absolute path or path 
-        // relative to current working directory.)        
+        // if this fails, try by using "name" as File (this preserves
+        // compatibility with case when user passes absolute path or path
+        // relative to current working directory.)
         if (name != null) {
             URL resource = locateJarFromResources(name);
 
             if (resource == null) {
                 File f = FileLocalizer.fetchFile(pigContext.getProperties(), name).file;
-                
+
                 if (!f.canRead()) {
                     int errCode = 4002;
                     String msg = "Can't read jar file: " + name;
                     throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
                 }
-                
+
                 resource = f.toURI().toURL();
             }
 
-            pigContext.addJar(resource);        
+            pigContext.addJar(resource);
         }
     }
-    
+
     /**
      * Universal Scripting Language Support, see PIG-928
-     * 
+     *
      * @param path path of the script file
      * @param scriptingLang language keyword or scriptingEngine used to interpret the script
      * @param namespace namespace defined for functions of this script
@@ -509,21 +521,21 @@ public class PigServer {
         }
         pigContext.addScriptFile(path);
     }
-    
+
     /**
      * Register a query with the Pig runtime. The query is parsed and registered, but it is not
      * executed until it is needed.
-     * 
+     *
      * @param query
      *            a Pig Latin expression to be evaluated.
      * @param startLine
      *            line number of the query within the whole script
      * @throws IOException
-     */    
-    public void registerQuery(String query, int startLine) throws IOException {            
+     */
+    public void registerQuery(String query, int startLine) throws IOException {
         currDAG.registerQuery(query, startLine);
     }
- 
+
     public Graph getClonedGraph() throws IOException {
         Graph graph = currDAG.clone();
 
@@ -534,58 +546,73 @@ public class PigServer {
         }
         return graph;
     }
-    
+
     /**
      * Register a query with the Pig runtime. The query is parsed and registered, but it is not
      * executed until it is needed.  Equivalent to calling {@link #registerQuery(String, int)}
      * with startLine set to 1.
-     * 
+     *
      * @param query
      *            a Pig Latin expression to be evaluated.
      * @throws IOException
-     */    
+     */
     public void registerQuery(String query) throws IOException {
         registerQuery(query, 1);
     }
-    
+
     /**
-     * Register a query with the Pig runtime.  The query will be read from the indicated file.
-     * @param fileName file to read query from.
+     * Register a pig script from InputStream source which is more general and extensible
+     * the pig script can be from local file, then you can use FileInputStream.
+     * or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
+     * even pig script can be in remote machine, which you get wrap it as SocketInputStream
+     * @param in
      * @throws IOException
      */
-    public void registerScript(String fileName) throws IOException {
-        registerScript(fileName, null, null);
+    public void registerScript(InputStream in) throws IOException{
+        registerScript(in, null, null);
     }
-    
+
     /**
-     * Register a pig script file.  The parameters in the file will be substituted with the values in params
-     * @param fileName  pig script file
-     * @param params  the key is the parameter name, and the value is the parameter value
+     * Register a pig script from InputStream source which is more general and extensible
+     * the pig script can be from local file, then you can use FileInputStream.
+     * or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
+     * even pig script can be in remote machine, which you get wrap it as SocketInputStream.
+     * The parameters in the pig script will be substituted with the values in params
+     * @param in
+     * @param params the key is the parameter name, and the value is the parameter value
      * @throws IOException
      */
-    public void registerScript(String fileName, Map<String,String> params) throws IOException {
-        registerScript(fileName, params, null);
+    public void registerScript(InputStream in, Map<String,String> params) throws IOException{
+        registerScript(in, params, null);
     }
 
     /**
-     * Register a pig script file.  The parameters in the file will be substituted with the values in the parameter files
-     * @param fileName pig script file
+     * Register a pig script from InputStream source which is more general and extensible
+     * the pig script can be from local file, then you can use FileInputStream.
+     * or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
+     * even pig script can be in remote machine, which you get wrap it as SocketInputStream
+     * The parameters in the pig script will be substituted with the values in the parameter files
+     * @param in
      * @param paramsFiles  files which have the parameter setting
      * @throws IOException
      */
-    public void registerScript(String fileName, List<String> paramsFiles) throws IOException {
-        registerScript(fileName, null, paramsFiles);
+    public void registerScript(InputStream in, List<String> paramsFiles) throws IOException {
+        registerScript(in, null, paramsFiles);
     }
-    
+
     /**
-     * Register a pig script file.  The parameters in the file will be substituted with the values in the map and the parameter files
+     * Register a pig script from InputStream.<br>
+     * The pig script can be from local file, then you can use FileInputStream.
+     * Or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
+     * Pig script can even be in remote machine, which you get wrap it as SocketInputStream.<br>
+     * The parameters in the pig script will be substituted with the values in the map and the parameter files.
      * The values in params Map will override the value in parameter file if they have the same parameter
-     * @param fileName  pig script
-     * @param params  the key is the parameter name, and the value is the parameter value
-     * @param paramsFiles   files which have the parameter setting
+     * @param in
+     * @param params the key is the parameter name, and the value is the parameter value
+     * @param paramsFiles  files which have the parameter setting
      * @throws IOException
      */
-    public void registerScript(String fileName, Map<String,String> params,List<String> paramsFiles) throws IOException {
+    public void registerScript(InputStream in, Map<String,String> params,List<String> paramsFiles) throws IOException {
         try {
             // transform the map type to list type which can been accepted by ParameterSubstitutionPreprocessor
             List<String> paramList = new ArrayList<String>();
@@ -594,22 +621,19 @@ public class PigServer {
                     paramList.add(entry.getKey()+"="+entry.getValue());
                 }
             }
-            
+
             // do parameter substitution
             ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
             StringWriter writer = new StringWriter();
-            psp.genSubstitutedFile(new BufferedReader(new InputStreamReader(new FileInputStream(fileName))), 
-                                   writer,  
-                                   paramList.size() > 0 ? paramList.toArray(new String[0]) : null, 
+            psp.genSubstitutedFile(new BufferedReader(new InputStreamReader(in)),
+                                   writer,
+                                   paramList.size() > 0 ? paramList.toArray(new String[0]) : null,
                                    paramsFiles!=null ? paramsFiles.toArray(new String[0]) : null);
-            
+
             GruntParser grunt = new GruntParser(new StringReader(writer.toString()));
             grunt.setInteractive(false);
             grunt.setParams(this);
             grunt.parseStopOnError(true);
-        } catch (FileNotFoundException e) {
-            log.error(e.getLocalizedMessage());
-            throw new IOException(e.getCause());
         } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
             log.error(e.getLocalizedMessage());
             throw new IOException(e.getCause());
@@ -618,6 +642,61 @@ public class PigServer {
             throw new IOException(e.getCause());
         }
     }
+
+    /**
+     * Register a query with the Pig runtime.  The query will be read from the indicated file.
+     * @param fileName file to read query from.
+     * @throws IOException
+     */
+    public void registerScript(String fileName) throws IOException {
+        registerScript(fileName, null, null);
+    }
+
+    /**
+     * Register a pig script file.  The parameters in the file will be substituted with the values in params
+     * @param fileName  pig script file
+     * @param params  the key is the parameter name, and the value is the parameter value
+     * @throws IOException
+     */
+    public void registerScript(String fileName, Map<String,String> params) throws IOException {
+        registerScript(fileName, params, null);
+    }
+
+
+
+    /**
+     * Register a pig script file.  The parameters in the file will be substituted with the values in the parameter files
+     * @param fileName pig script file
+     * @param paramsFiles  files which have the parameter setting
+     * @throws IOException
+     */
+    public void registerScript(String fileName, List<String> paramsFiles) throws IOException {
+        registerScript(fileName, null, paramsFiles);
+    }
+
+    /**
+     * Register a pig script file.  The parameters in the file will be substituted with the values in the map and the parameter files
+     * The values in params Map will override the value in parameter file if they have the same parameter
+     * @param fileName  pig script
+     * @param params  the key is the parameter name, and the value is the parameter value
+     * @param paramsFiles   files which have the parameter setting
+     * @throws IOException
+     */
+    public void registerScript(String fileName, Map<String,String> params,List<String> paramsFiles) throws IOException {
+        FileInputStream fis = null;
+        try{
+            fis = new FileInputStream(fileName);
+            registerScript(fis, params, paramsFiles);
+        }catch (FileNotFoundException e){
+            log.error(e.getLocalizedMessage());
+            throw new IOException(e.getCause());
+        } finally {
+            if (fis != null) {
+                fis.close();
+            }
+        }
+    }
+
     /**
      * Intended to be used by unit tests only.
      * Print a list of all aliases in in the current Pig Latin script.  Output is written to
@@ -645,16 +724,19 @@ public class PigServer {
                     break;
                 }
             }
-            if (schema != null) System.out.println(alias + ": " + schema.toString());    
-            else System.out.println("Schema for " + alias + " unknown.");
+            if (schema != null) {
+                System.out.println(alias + ": " + schema.toString());
+            } else {
+                System.out.println("Schema for " + alias + " unknown.");
+            }
             return schema;
         } catch (FrontendException fee) {
             int errCode = 1001;
-            String msg = "Unable to describe schema for alias " + alias; 
+            String msg = "Unable to describe schema for alias " + alias;
             throw new FrontendException (msg, errCode, PigException.INPUT, false, null, fee);
         }
     }
-    
+
     /**
      * Write the schema for a nestedAlias to System.out. Denoted by alias::nestedAlias.
      * @param alias Alias whose schema has nestedAlias
@@ -671,7 +753,7 @@ public class PigServer {
         }
         else {
             int errCode = 1001;
-            String msg = "Unable to describe schema for " + alias + "::" + nestedAlias; 
+            String msg = "Unable to describe schema for " + alias + "::" + nestedAlias;
             throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
         }
     }
@@ -683,7 +765,7 @@ public class PigServer {
     public void setJobName(String name){
         currDAG.setJobName(name);
     }
-    
+
     /**
      * Set Hadoop job priority.  This value will get translated to mapred.job.priority.
      * @param priority valid values are found in {@link org.apache.hadoop.mapred.JobPriority}
@@ -725,10 +807,10 @@ public class PigServer {
             if (currDAG.isBatchOn()) {
                 currDAG.execute();
             }
-            
+
             ExecJob job = store(id, FileLocalizer.getTemporaryPath(pigContext)
                     .toString(), Utils.getTmpFileCompressorName(pigContext) + "()");
-            
+
             // invocation of "execute" is synchronous!
 
             if (job.getStatus() == JOB_STATUS.COMPLETED) {
@@ -739,7 +821,7 @@ public class PigServer {
                 Exception e = job.getException();
                 int errCode = 1066;
                 String msg = "Unable to open iterator for alias " + id +
-                ". Backend error : " + e.getMessage(); 
+                ". Backend error : " + e.getMessage();
                 throw new FrontendException(msg, errCode, PigException.INPUT, e);
             } else {
                 throw new IOException("Job terminated with anomalous status "
@@ -751,11 +833,11 @@ public class PigServer {
         }
         catch (Exception e) {
             int errCode = 1066;
-            String msg = "Unable to open iterator for alias " + id ; 
+            String msg = "Unable to open iterator for alias " + id ;
             throw new FrontendException(msg, errCode, PigException.INPUT, e);
         }
     }
-    
+
     /**
      * Executes a Pig Latin script up to and including indicated alias and stores the resulting
      * records into a file.  That is, if a user does:
@@ -775,7 +857,7 @@ public class PigServer {
      * </pre>
      * filtered and sorted data will be stored to the file <tt>bar</tt>.
      * Equivalent to calling {@link #store(String, String, String)} with
-     * <tt>org.apache.pig.PigStorage</tt> as the store function. 
+     * <tt>org.apache.pig.PigStorage</tt> as the store function.
      * @param id The alias to store
      * @param filename The file to which to store to
      * @return {@link ExecJob} containing information about this job
@@ -784,7 +866,7 @@ public class PigServer {
     public ExecJob store(String id, String filename) throws IOException {
         return store(id, filename, PigStorage.class.getName() + "()");   // SFPig is the default store function
     }
-        
+
     /**
      * Executes a Pig Latin script up to and including indicated alias and stores the resulting
      * records into a file.  That is, if a user does:
@@ -812,7 +894,7 @@ public class PigServer {
      * @return {@link ExecJob} containing information about this job
      * @throws IOException
      */
-    public ExecJob store(String id, String filename, String func) 
+    public ExecJob store(String id, String filename, String func)
             throws IOException {
         PigStats stats = storeEx(id, filename, func);
         if (stats.getOutputStats().size() < 1) {
@@ -826,19 +908,20 @@ public class PigServer {
         }else{
             HJob job = new HJob(JOB_STATUS.FAILED, pigContext,
                     output.getPOStore(), output.getAlias(), stats);
-            
+
             //check for exception
             Exception ex = null;
             for(JobStats js : stats.getJobGraph()){
-                if(js.getException() != null)
+                if(js.getException() != null) {
                     ex = js.getException();
+                }
             }
             job.setException(ex);
             return job;
         }
 
     }
-       
+
     private PigStats storeEx(
             String id,
             String filename,
@@ -853,7 +936,7 @@ public class PigServer {
 
             // MRCompiler needs a store to be the leaf - hence
             // add a store to the plan to explain
-            
+
             // figure out the leaf to which the store needs to be added
             List<LogicalOperator> leaves = lp.getLeaves();
             LogicalOperator leaf = null;
@@ -862,24 +945,25 @@ public class PigServer {
             } else {
                 for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
                     LogicalOperator leafOp = it.next();
-                    if(leafOp.getAlias().equals(id))
+                    if(leafOp.getAlias().equals(id)) {
                         leaf = leafOp;
+                    }
                 }
             }
-            
+
             LogicalPlan unCompiledstorePlan = QueryParser.generateStorePlan(
                     scope, lp, filename, func, leaf, leaf.getAlias(),
                     pigContext);
             LogicalPlan storePlan = compileLp(unCompiledstorePlan, g, true);
-            
+
             return executeCompiledLogicalPlan(storePlan);
         } catch (PigException e) {
             int errCode = 1002;
             String msg = "Unable to store alias " + id;
             throw new PigException(msg, errCode, PigException.INPUT, e);
-        }   
+        }
     }
-    
+
     /**
      * Provide information on how a pig query will be executed.  For now
      * this information is very developer focussed, and probably not very
@@ -898,7 +982,7 @@ public class PigServer {
      * @param alias Name of alias to explain.
      * @param format Format in which the explain should be printed.  If text, then the plan will
      * be printed in plain text.  Otherwise, the execution plan will be printed in
-     * <a href="http://en.wikipedia.org/wiki/DOT_language">DOT</a> format. 
+     * <a href="http://en.wikipedia.org/wiki/DOT_language">DOT</a> format.
      * @param verbose Controls the amount of information printed
      * @param markAsExecute When set will treat the explain like a
      * call to execute in the respoect that all the pending stores are
@@ -931,7 +1015,7 @@ public class PigServer {
                 LogicalPlanMigrationVistor migrator = new LogicalPlanMigrationVistor(lp);
                 migrator.visit();
                 org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan();
-                
+
                 HashSet<String> optimizerRules = null;
                 try {
                     optimizerRules = (HashSet<String>) ObjectSerializer
@@ -942,10 +1026,10 @@ public class PigServer {
                     String msg = "Unable to deserialize optimizer rules.";
                     throw new FrontendException(msg, errCode, PigException.BUG, ioe);
                 }
-                
+
                 LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(newPlan, 3, optimizerRules);
-                optimizer.optimize();                
-                
+                optimizer.optimize();
+
                 newPlan.explain(lps, format, verbose);
             }
             pp.explain(pps, format, verbose);
@@ -967,29 +1051,29 @@ public class PigServer {
      * not take into account a replication factor, as that can vary from file
      * to file. Thus if you are using this to determine if you data set will fit
      * in the HDFS, you need to divide the result of this call by your specific replication
-     * setting. 
+     * setting.
      * @return unused byte capacity of the file system.
      * @throws IOException
      */
     public long capacity() throws IOException {
         if (pigContext.getExecType() == ExecType.LOCAL) {
             throw new IOException("capacity only supported for non-local execution");
-        } 
+        }
         else {
             DataStorage dds = pigContext.getDfs();
-            
+
             Map<String, Object> stats = dds.getStatistics();
 
             String rawCapacityStr = (String) stats.get(DataStorage.RAW_CAPACITY_KEY);
             String rawUsedStr = (String) stats.get(DataStorage.RAW_USED_KEY);
-            
+
             if ((rawCapacityStr == null) || (rawUsedStr == null)) {
                 throw new IOException("Failed to retrieve capacity stats");
             }
-            
+
             long rawCapacityBytes = new Long(rawCapacityStr).longValue();
             long rawUsedBytes = new Long(rawUsedStr).longValue();
-            
+
             return rawCapacityBytes - rawUsedBytes;
         }
     }
@@ -1010,7 +1094,7 @@ public class PigServer {
 
         return length * replication;
     }
-    
+
     /**
      * Test whether a file exists.
      * @param filename to test
@@ -1021,7 +1105,7 @@ public class PigServer {
         ElementDescriptor elem = pigContext.getDfs().asElement(filename);
         return elem.exists();
     }
-    
+
     /**
      * Delete a file.
      * @param filename to delete
@@ -1033,7 +1117,7 @@ public class PigServer {
         elem.delete();
         return true;
     }
-    
+
     /**
      * Rename a file.
      * @param source file to rename
@@ -1045,7 +1129,7 @@ public class PigServer {
         pigContext.rename(source, target);
         return true;
     }
-    
+
     /**
      * Make a directory.
      * @param dirs directory to make
@@ -1057,8 +1141,8 @@ public class PigServer {
         container.create();
         return true;
     }
-    
-    /** 
+
+    /**
      * List the contents of a directory.
      * @param dir name of directory to list
      * @return array of strings, one for each file name
@@ -1068,16 +1152,16 @@ public class PigServer {
         Collection<String> allPaths = new ArrayList<String>();
         ContainerDescriptor container = pigContext.getDfs().asContainer(dir);
         Iterator<ElementDescriptor> iter = container.iterator();
-            
+
         while (iter.hasNext()) {
             ElementDescriptor elem = iter.next();
             allPaths.add(elem.toString());
         }
-            
+
         String[] type = new String[1];
         return allPaths.toArray(type);
     }
-    
+
     /**
      * Does not work at the moment.
      */
@@ -1086,7 +1170,7 @@ public class PigServer {
 //        return MapReduceLauncher.totalHadoopTimeSpent;
         return 0L;
     }
-  
+
     /**
      * Return a map containing the logical plan associated with each alias.
      * @return map
@@ -1128,7 +1212,7 @@ public class PigServer {
 
     public Map<Operator, DataBag> getExamples(String alias) throws IOException {
         LogicalPlan plan = null;
-        try {        
+        try {
             if (currDAG.isBatchOn() && alias != null) {
                 currDAG.execute();
             }
@@ -1157,11 +1241,11 @@ public class PigServer {
     private LogicalPlan getStorePlan(String alias) throws IOException {
         Graph g = getClonedGraph();
         LogicalPlan lp = g.getPlan(alias);
-        
+
         if (!isBatchOn() || alias != null) {
             // MRCompiler needs a store to be the leaf - hence
             // add a store to the plan to explain
-            
+
             // figure out the leaves to which stores need to be added
             List<LogicalOperator> leaves = lp.getLeaves();
             LogicalOperator leaf = null;
@@ -1170,20 +1254,21 @@ public class PigServer {
             } else {
                 for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
                     LogicalOperator leafOp = it.next();
-                    if(leafOp.getAlias().equals(alias))
+                    if(leafOp.getAlias().equals(alias)) {
                         leaf = leafOp;
+                    }
                 }
             }
-            
-            lp = QueryParser.generateStorePlan(scope, lp, "fakefile", 
+
+            lp = QueryParser.generateStorePlan(scope, lp, "fakefile",
                                                PigStorage.class.getName(), leaf, "fake", pigContext);
         }
-        
+
         compileLp(lp, g, true);
-        
+
         return lp;
     }
-    
+
     private PigStats execute(String alias) throws FrontendException, ExecException {
         LogicalPlan typeCheckedLp = compileLp(alias);
 
@@ -1199,7 +1284,7 @@ public class PigServer {
 
         return executeCompiledLogicalPlan(typeCheckedLp);
     }
-    
+
     private PigStats executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException, FrontendException {
         // discover pig features used in this script
         ScriptState.get().setScriptFeatures(compiledLp);
@@ -1234,12 +1319,12 @@ public class PigServer {
     private LogicalPlan compileLp(
             String alias,
             boolean optimize) throws FrontendException {
-        
+
         // create a clone of the logical plan and give it
         // to the operations below
         LogicalPlan lpClone;
         Graph g;
- 
+
         try {
             g = getClonedGraph();
             lpClone = g.getPlan(alias);
@@ -1250,7 +1335,7 @@ public class PigServer {
         }
         return compileLp(lpClone, g, optimize);
     }
-    
+
     private void mergeScalars(LogicalPlan lp, Graph g) throws FrontendException {
         // When we start processing a store we look for scalars to add stores
         // to respective logical plans and temporary files to the attributes
@@ -1269,13 +1354,13 @@ public class PigServer {
 
                 LogicalPlan referredPlan = g.getAliases().get(g.getAliasOp().get(alias));
 
-                // If referredPlan already has a store, 
+                // If referredPlan already has a store,
                 // we just use it instead of adding one from our pocket
                 store = referredPlan.getLeaves().get(0);
-                if(store instanceof LOStore 
+                if(store instanceof LOStore
                         &&
                         ((LOStore)store).getOutputFile().getFuncName().equals(
-                                InterStorage.class.getName())                                            
+                                InterStorage.class.getName())
                 ) {
                         // use this store
                         fileSpec = ((LOStore)store).getOutputFile();
@@ -1299,9 +1384,10 @@ public class PigServer {
 
                 innerPlan.add(rconst);
                 innerPlan.connect(rconst, scalarEntry.getKey());
-                
-                if (lp.getSoftLinkSuccessors(store)==null || !lp.getSoftLinkSuccessors(store).contains(scalarEntry.getValue().second))
+
+                if (lp.getSoftLinkSuccessors(store)==null || !lp.getSoftLinkSuccessors(store).contains(scalarEntry.getValue().second)) {
                     lp.createSoftLink(store, scalarEntry.getValue().second);
+                }
             }
         } catch (IOException ioe) {
             int errCode = 2219;
@@ -1309,28 +1395,28 @@ public class PigServer {
             throw new FrontendException(msg, errCode, PigException.BUG, ioe);
         }
     }
-    
+
     private LogicalPlan compileLp(LogicalPlan lp, Graph g, boolean optimize) throws FrontendException {
         mergeScalars(lp, g);
-        
+
         return compileLp(lp, optimize);
     }
-    
+
     @SuppressWarnings("unchecked")
     private LogicalPlan compileLp(LogicalPlan lp, boolean optimize) throws
     FrontendException {
         // Set the logical plan values correctly in all the operators
         PlanSetter ps = new PlanSetter(lp);
         ps.visit();
-        
+
         UnionOnSchemaSetter setUnionOnSchema = new UnionOnSchemaSetter(lp, pigContext);
         setUnionOnSchema.visit();
-        
+
         // run through validator
         CompilationMessageCollector collector = new CompilationMessageCollector() ;
         boolean isBeforeOptimizer = true;
         validate(lp, collector, isBeforeOptimizer);
-        
+
         // optimize
         if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("false")) {
             HashSet<String> optimizerRules = null;
@@ -1346,16 +1432,16 @@ public class PigServer {
 
             LogicalOptimizer optimizer = new LogicalOptimizer(lp, pigContext.getExecType(), optimizerRules);
             optimizer.optimize();
-            
+
             // compute whether output data is sorted or not
             SortInfoSetter sortInfoSetter = new SortInfoSetter(lp);
             sortInfoSetter.visit();
-            
+
             // run validations to be done after optimization
             isBeforeOptimizer = false;
             validate(lp, collector, isBeforeOptimizer);
         }
-        
+
         return lp;
     }
 
@@ -1372,16 +1458,16 @@ public class PigServer {
             boolean isBeforeOptimizer) throws FrontendException {
         FrontendException caught = null;
         try {
-            LogicalPlanValidationExecutor validator = 
+            LogicalPlanValidationExecutor validator =
                 new LogicalPlanValidationExecutor(lp, pigContext, isBeforeOptimizer);
             validator.validate(lp, collector);
         } catch (FrontendException fe) {
             // Need to go through and see what the collector has in it.  But
             // remember what we've caught so we can wrap it into what we
             // throw.
-            caught = fe;            
+            caught = fe;
         }
-        
+
         if(aggregateWarning) {
             CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
         } else {
@@ -1389,7 +1475,7 @@ public class PigServer {
                 CompilationMessageCollector.logAllMessages(collector, log);
             }
         }
-        
+
         if (caught != null) {
             throw caught;
         }
@@ -1408,10 +1494,10 @@ public class PigServer {
             int errCode = 1005;
             String msg = "No plan for " + alias + " to " + operation;
             throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
-        }        
+        }
         return lp;
     }
-    
+
     public static class SortInfoSetter extends LOVisitor{
 
         public SortInfoSetter(LogicalPlan plan) {
@@ -1420,14 +1506,14 @@ public class PigServer {
 
         @Override
         protected void visit(LOStore store) throws VisitorException {
-            
+
             LogicalOperator storePred = store.getPlan().getPredecessors(store).get(0);
             if(storePred == null){
                 int errCode = 2051;
                 String msg = "Did not find a predecessor for Store." ;
-                throw new VisitorException(msg, errCode, PigException.BUG);    
+                throw new VisitorException(msg, errCode, PigException.BUG);
             }
-            
+
             SortInfo sortInfo = null;
             if(storePred instanceof LOLimit) {
                 storePred = store.getPlan().getPredecessors(storePred).get(0);
@@ -1443,8 +1529,9 @@ public class PigServer {
                         Object value = ((LOConst)root).getValue();
                         if (value instanceof Boolean && (Boolean)value==true) {
                             LogicalOperator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
-                            if (split instanceof LOSplit)
+                            if (split instanceof LOSplit) {
                                 storePred = store.getPlan().getPredecessors(split).get(0);
+                            }
                         }
                     }
                 }
@@ -1466,38 +1553,38 @@ public class PigServer {
      * This class holds the internal states of a grunt shell session.
      */
     private class Graph {
-        
-        private Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
-        
-        private Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
-        
-        private Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
-       
-        private List<String> scriptCache = new ArrayList<String>();	
+
+        private final Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+
+        private final Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
+
+        private final Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+
+        private final List<String> scriptCache = new ArrayList<String>();
 
         // the fileNameMap contains filename to canonical filename
         // mappings. This is done so we can reparse the cached script
         // and remember the translation (current directory might only
         // be correct during the first parse
         private Map<String, String> fileNameMap = new HashMap<String, String>();
-    
-        private Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore, LogicalPlan>();
-        
-        private Set<LOLoad> loadOps = new HashSet<LOLoad>();
+
+        private final Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore, LogicalPlan>();
+
+        private final Set<LOLoad> loadOps = new HashSet<LOLoad>();
 
         private String jobName;
-        
+
         private String jobPriority;
 
-        private boolean batchMode;
+        private final boolean batchMode;
 
         private int processedStores;
 
         private int ignoreNumStores;
-        
+
         private LogicalPlan lp;
-        
-        Graph(boolean batchMode) { 
+
+        Graph(boolean batchMode) {
             this.batchMode = batchMode;
             this.processedStores = 0;
             this.ignoreNumStores = 0;
@@ -1505,25 +1592,25 @@ public class PigServer {
                                                                   PigContext.JOB_NAME_PREFIX+":DefaultJobName");
             this.lp = new LogicalPlan();
         };
-        
+
         Map<LogicalOperator, LogicalPlan> getAliases() { return aliases; }
-        
+
         Map<OperatorKey, LogicalOperator> getOpTable() { return opTable; }
-        
+
         Map<String, LogicalOperator> getAliasOp() { return aliasOp; }
-        
+
         List<String> getScriptCache() { return scriptCache; }
-        
+
         boolean isBatchOn() { return batchMode; };
 
         boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
-        
+
         PigStats execute() throws ExecException, FrontendException {
             pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
             if (jobPriority != null) {
               pigContext.getProperties().setProperty(PigContext.JOB_PRIORITY, jobPriority);
             }
-            
+
             PigStats stats = PigServer.this.execute(null);
             processedStores = storeOpTable.keySet().size();
             return stats;
@@ -1543,7 +1630,7 @@ public class PigServer {
 
         LogicalPlan getPlan(String alias) throws IOException {
             LogicalPlan plan = lp;
-                
+
             if (alias != null) {
                 LogicalOperator op = aliasOp.get(alias);
                 if(op == null) {
@@ -1557,14 +1644,14 @@ public class PigServer {
         }
 
         void registerQuery(String query, int startLine) throws IOException {
-            
+
             LogicalPlan tmpLp = parseQuery(query, startLine);
-            
+
             // store away the query for use in cloning later
             scriptCache.add(query);
             if (tmpLp.getLeaves().size() == 1) {
                 LogicalOperator op = tmpLp.getSingleLeafPlanOutputOp();
-                
+
                 // Check if we just processed a LOStore i.e. STORE
                 if (op instanceof LOStore) {
 
@@ -1596,17 +1683,17 @@ public class PigServer {
                     }
                 }
             }
-        }        
-    
-        LogicalPlan parseQuery(String query, int startLine) throws IOException {        
-            if (query == null || query.length() == 0) { 
+        }
+
+        LogicalPlan parseQuery(String query, int startLine) throws IOException {
+            if (query == null || query.length() == 0) {
                 int errCode = 1084;
                 String msg = "Invalid Query: Query is null or of size 0";
                 throw new FrontendException(msg, errCode, PigException.INPUT);
             }
 
             query = query.trim();
-        
+
             try {
                 return new LogicalPlanBuilder(PigServer.this.pigContext).parse(scope, query,
                                               aliases, opTable, aliasOp, startLine, fileNameMap);
@@ -1623,7 +1710,7 @@ public class PigServer {
             // There are two choices on how we clone the logical plan
             // 1 - we really clone each operator and connect up the cloned operators
             // 2 - we cache away the script till the point we need to clone
-            // and then simply re-parse the script. 
+            // and then simply re-parse the script.
             // The latter approach is used here
             // FIXME: There is one open issue with this now:
             // Consider the following script:
@@ -1637,16 +1724,16 @@ public class PigServer {
             // checks for file existence of files in the load
             // in the case where the file is a local one -i.e. with file: prefix
             // This will be a known issue now and we will need to revisit later
-            
+
             // parse each line of the cached script
             int lineNumber = 1;
-            
-            // create data structures needed for parsing        
+
+            // create data structures needed for parsing
             Graph graph = new Graph(isBatchOn());
             graph.ignoreNumStores = processedStores;
             graph.processedStores = processedStores;
             graph.fileNameMap = fileNameMap;
-            
+
             try {
                 for (Iterator<String> it = getScriptCache().iterator(); it.hasNext(); lineNumber++) {
                     if (isBatchOn()) {
@@ -1659,17 +1746,17 @@ public class PigServer {
             } catch (IOException ioe) {
                 ioe.printStackTrace();
                 graph = null;
-            }          
+            }
             return graph;
         }
-       
+
         private void postProcess() throws IOException {
-            
+
             // Set the logical plan values correctly in all the operators
             PlanSetter ps = new PlanSetter(lp);
             ps.visit();
-            
-            // The following code deals with store/load combination of 
+
+            // The following code deals with store/load combination of
             // intermediate files. In this case we will replace the load operator
             // with a (implicit) split operator, iff the load/store
             // func is reversible (because that's when we can safely
@@ -1696,11 +1783,11 @@ public class PigServer {
                             String msg = "Failed to connect store with dependent load.";
                             throw new FrontendException(msg, errCode, ex);
                         }
-                        
 
-                         
+
+
                         //TODO
-                        //if the load has a schema then the type cast inserter has to introduce 
+                        //if the load has a schema then the type cast inserter has to introduce
                         //casts to get the right types. Since the type cast inserter runs later,
                         //removing the load could create problems. For example, if the storage function
                         //does not preserve type information required and the subsequent load created
@@ -1712,15 +1799,15 @@ public class PigServer {
                         //type information is preserved. Similarly, the load functions should support
                         //a similar interface. With these interfaces in place, the code below can be
                         //used to optimize the store/load combination
-                            
 
-                        /*                         
+
+                        /*
                         LoadFunc lFunc = (LoadFunc) pigContext.instantiateFuncFromSpec(load.getInputFile().getFuncSpec());
                         StoreFunc sFunc = (StoreFunc) pigContext.instantiateFuncFromSpec(store.getOutputFile().getFuncSpec());
                         if (lFunc.getClass() == sFunc.getClass() && lFunc instanceof ReversibleLoadStoreFunc) {
-                            
+
                             log.info("Removing unnecessary load operation from location: "+ifile);
-                            
+
                             // In this case we remember the input file
                             // spec in the store. We might have to use it
                             // in the MR compiler to recreate the load, if
@@ -1728,27 +1815,27 @@ public class PigServer {
                             store.setInputSpec(load.getInputFile());
 
                             LogicalOperator storePred = lp.getPredecessors(store).get(0);
-                            
+
                             // In this case we remember the input file
                             // spec in the store. We might have to use it
                             // in the MR compiler to recreate the load, if
                             // the store happens on a job boundary.
                             store.setInputSpec(load.getInputFile());
-                            
+
                             Schema storePredSchema = storePred.getSchema();
                             if(storePredSchema != null) {
                                 load.setSchema(storePredSchema);
-                                TypeCastInserter typeCastInserter = new TypeCastInserter(lp, LOLoad.class.getName());                                
+                                TypeCastInserter typeCastInserter = new TypeCastInserter(lp, LOLoad.class.getName());
                                 List<LogicalOperator> loadList = new ArrayList<LogicalOperator>();
                                 loadList.add(load);
                                 //the following needs a change to TypeCastInserter and LogicalTransformer
                                 typeCastInserter.doTransform(loadList, false);
                             }
-                            
+
                             lp.disconnect(store, load);
                             lp.connect(storePred, load);
                             lp.removeAndReconnectMultiSucc(load);
-                            
+
                             List<LogicalOperator> succs = lp.getSuccessors(load);
                         } else {
                             try {
@@ -1757,7 +1844,7 @@ public class PigServer {
                                 int errCode = 2128;
                                 String msg = "Failed to connect store with dependent load.";
                                 throw new FrontendException(msg, errCode, ex);
-                            }    
+                            }
                         }
                         */
                     }

Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1056801&r1=1056800&r2=1056801&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Sat Jan  8 21:13:46 2011
@@ -21,6 +21,7 @@ package org.apache.pig.test;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -32,6 +33,7 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -47,6 +49,7 @@ import junit.framework.TestCase;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -624,6 +627,51 @@ public class TestPigServer extends TestC
         }
     }
     
+    // build the pig script from in-memory, and wrap it as ByteArrayInputStream
+    @Test
+    public void testRegisterScriptFromStream() throws Exception{
+        // using params map
+        PigServer pig=new PigServer(ExecType.LOCAL);
+        Map<String,String> params=new HashMap<String, String>();
+        params.put("input", "test/org/apache/pig/test/data/passwd");
+        String script="a = load '$input' using PigStorage(':');";
+        pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params);
+        Iterator<Tuple> iter=pig.openIterator("a");
+        int index=0;
+        List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
+        while(iter.hasNext()){
+            Tuple tuple=iter.next();
+            assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+            index++;
+        }
+        
+        // using param file
+        pig=new PigServer(ExecType.LOCAL);
+        List<String> paramFile=new ArrayList<String>();
+        paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath());
+        pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),paramFile);
+        iter=pig.openIterator("a");
+        index=0;
+        expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":");
+        while(iter.hasNext()){
+            Tuple tuple=iter.next();
+            assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+            index++;
+        }
+        
+        // using both param value and param file, param value should override param file
+        pig=new PigServer(ExecType.LOCAL);
+        pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params,paramFile);
+        iter=pig.openIterator("a");
+        index=0;
+        expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
+        while(iter.hasNext()){
+            Tuple tuple=iter.next();
+            assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+            index++;
+        }
+    }
+    
     @Test
     public void testPigProperties() throws Throwable {
         File propertyFile = new File("pig.properties");