You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/27 17:54:22 UTC

svn commit: r641888 - in /incubator/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/ src/org/apache/pig/backend/local/executionengine/ src/org/apache/p...

Author: olga
Date: Thu Mar 27 09:54:10 2008
New Revision: 641888

URL: http://svn.apache.org/viewvc?rev=641888&view=rev
Log:
PIG-94: M2 - I/0, ship/cache, error handling

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/PigServer.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
    incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java
    incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java
    incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
    incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java
    incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Mar 27 09:54:10 2008
@@ -181,3 +181,6 @@
     PIG-154: moving parsing for DEFINE and STORE into QueryParser
 
     PIG-100: improved error handling
+
+    PIG-94: changes for M2 of streaming: input/ouptut/ ship/cache error
+    handling

Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Thu Mar 27 09:54:10 2008
@@ -54,6 +54,7 @@
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.WrappedIOException;
 
 
@@ -147,6 +148,16 @@
     }
     
     /**
+     * Add a path to be skipped while automatically shipping binaries for 
+     * streaming.
+     *  
+     * @param path path to be skipped
+     */
+    public void addPathToSkip(String path) {
+        pigContext.addPathToSkip(path);
+    }
+    
+    /**
      * Defines an alias for the given function spec. This
      * is useful for functions that require arguments to the 
      * constructor.
@@ -157,6 +168,16 @@
      */
     public void registerFunction(String function, String functionSpec) {
         pigContext.registerFunction(function, functionSpec);
+    }
+    
+    /**
+     * Defines an alias for the given streaming command.
+     * 
+     * @param commandAlias - the new command alias to define
+     * @param command - streaming command to be executed
+     */
+    public void registerStreamingCommand(String commandAlias, StreamingCommand command) {
+        pigContext.registerStreamCmd(commandAlias, command);
     }
     
     private URL locateJarFromResources(String jarName) throws IOException {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Mar 27 09:54:10 2008
@@ -112,6 +112,9 @@
     }
 
     public void init() throws ExecException {
+        // Copy over necessary configuration from PigContext
+        updateConfiguration(pigContext.getConf());
+        
         //First set the ssh socket factory
         setSSHFactory();
         
@@ -164,6 +167,7 @@
         }
 
         try {
+            // Set job-specific configuration knobs
             jobClient = new JobClient(new JobConf(conf.getConfiguration()));
         }
         catch (IOException e) {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Thu Mar 27 09:54:10 2008
@@ -21,6 +21,7 @@
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,6 +56,7 @@
     public Class<WritableComparator> userComparator = null;
     public String quantilesFile = null;
     public PigContext pigContext;
+    public Properties properties = new Properties();
     
     public OperatorKey sourceLogicalKey;
     
@@ -244,6 +246,8 @@
             toMap.set(i, spec);
         else
             toMap.set(i, toMap.get(i).addSpec(spec));
+        
+        properties.putAll(spec.getProperties());
     }
     
     public void addReduceSpec(EvalSpec spec){
@@ -251,6 +255,8 @@
             toReduce = spec;
         else
             toReduce = toReduce.addSpec(spec);
+        
+        properties.putAll(spec.getProperties());
     }
     
     public void visit(POVisitor v) {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu Mar 27 09:54:10 2008
@@ -19,13 +19,22 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
+import java.util.Map.Entry;
 
+import org.apache.commons.httpclient.URIException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobClient;
@@ -67,6 +76,8 @@
     public static Configuration config = null;
     public static HExecutionEngine execEngine = null;
 
+    public static final String LOG_DIR = "_logs";
+    
     public static void setConf(Configuration configuration) {
         config = configuration;
     }
@@ -116,6 +127,7 @@
      */
     public boolean launchPig(POMapreduce pom) throws IOException {
         JobConf conf = new JobConf(config);
+        setJobProperties(conf, pom);
         conf.setJobName(pom.pigContext.getJobName());
         boolean success = false;
         List<String> funcs = new ArrayList<String>();
@@ -132,11 +144,24 @@
             funcs.addAll(pom.toReduce.getFuncs());
         }
         
+        String shipFiles = 
+            pom.properties.getProperty("pig.streaming.ship.files");
+        List<String> files = new ArrayList<String>(); 
+        if (shipFiles != null) {
+            String[] paths = shipFiles.split(",");
+            for (String path : paths) {
+                path = path.trim();
+                if (path.length() > 0) {
+                    files.add(path.trim());
+                }
+            }
+        }
+
         // create jobs.jar locally and pass it to hadoop
         File submitJarFile = File.createTempFile("Job", ".jar");    
         try {
             FileOutputStream fos = new FileOutputStream(submitJarFile);
-            JarManager.createJar(fos, funcs, pom.pigContext);
+            JarManager.createJar(fos, funcs, files, pom.pigContext);
             log.debug("Job jar size = " + submitJarFile.length());
             conf.setJar(submitJarFile.getPath());
             String user = System.getProperty("user.name");
@@ -193,6 +218,41 @@
             conf.setOutputPath(new Path(pom.outputFileSpec.getFileName()));
             conf.set("pig.storeFunc", pom.outputFileSpec.getFuncSpec());
 
+            // Setup the DistributedCache for this job
+            DistributedCache.createSymlink(conf);
+            
+            String cacheFiles = 
+                pom.properties.getProperty("pig.streaming.cache.files");
+            if (cacheFiles != null) {
+                String[] paths = cacheFiles.split(",");
+                
+                for (String path : paths) {
+                    path = path.trim();
+                    if (path.length() != 0) {
+                        URI uri = null;
+                        try {
+                            uri = new URI(path);
+                        } catch (URISyntaxException ue) {
+                            throw new IOException("Invalid cache specification, file doesn't exist: " + path);
+                        }
+                        DistributedCache.addCacheFile(uri, conf);
+                    }
+                }
+            }
+
+            //TODO - Remove this
+            conf.setBoolean("keep.failed.task.files", true);
+            
+            // Setup the logs directory for this job
+            String jobOutputFileName = pom.pigContext.getJobOutputFile();
+            if (jobOutputFileName != null && jobOutputFileName.length() > 0) {
+                Path jobOutputFile = new Path(pom.pigContext.getJobOutputFile());
+                conf.set("pig.output.dir", 
+                        jobOutputFile.getParent().toString());
+                conf.set("pig.streaming.log.dir", 
+                        new Path(jobOutputFile, LOG_DIR).toString());
+            }
+            
             //
             // Now, actually submit the job (using the submit name)
             //
@@ -316,6 +376,7 @@
         }
         catch (Exception e) {
             // Do we need different handling for different exceptions
+            e.printStackTrace();
             throw WrappedIOException.wrap(e);
         }
         finally {
@@ -324,6 +385,20 @@
         return success;
     }
 
+    /**
+     * Copy job-specific configuration from the <code>Properties</code>
+     * of the given <code>POMapreduce</code>.
+     * 
+     * @param job job configuration
+     * @param pom <code>POMapreduce</code> to be executed
+     */
+    private static void setJobProperties(JobConf job, POMapreduce pom) 
+    throws IOException {
+        for (Map.Entry property : pom.properties.entrySet()) {
+            job.set((String)property.getKey(), (String)property.getValue());
+        }
+    }
+    
 private void getErrorMessages(TaskReport reports[], String type)
 {
     for (int i = 0; i < reports.length; i++) {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Thu Mar 27 09:54:10 2008
@@ -65,7 +65,7 @@
                 String evalSpec = job.get("pig.combineFunc", "");
                 EvalSpec esp = (EvalSpec)ObjectSerializer.deserialize(evalSpec);
                 if(esp != null) esp.instantiateFunc(pigContext);
-                evalPipe = esp.setupPipe(finalout);
+                evalPipe = esp.setupPipe(null, finalout);
                 //throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString());
                 
                 bags = new DataBag[inputCount];

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java Thu Mar 27 09:54:10 2008
@@ -187,7 +187,11 @@
             }
             myReader.set(this);
             loader.bindTo(split.getPath().toString(), new BufferedPositionedInputStream(is, start), start, end);
-            
+         
+            // Mimic FileSplit
+            job.set("map.input.file", split.getPath().toString());
+            job.setLong("map.input.start", split.getStart());
+            job.setLong("map.input.length", split.getLength());
         }
 
         public JobConf getJobConf(){

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Thu Mar 27 09:54:10 2008
@@ -23,6 +23,7 @@
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -84,6 +85,7 @@
     public static Reporter reporter = null;
 
     JobConf                           job;
+    private Properties                properties = new Properties();
     private DataCollector             evalPipe;
     private OutputCollector           oc;
     private EvalSpec                  group;
@@ -104,7 +106,7 @@
 
         oc = output;
 
-        setupMapPipe(reporter);
+        setupMapPipe(properties, reporter);
 
         // allocate key & value instances that are re-used for all entries
         WritableComparable key = input.createKey();
@@ -127,7 +129,7 @@
         try {
             oc = output;
             if (evalPipe == null) {
-                setupReducePipe();
+                setupReducePipe(properties);
             }
 
             DataBag[] bags = new DataBag[inputCount];
@@ -158,11 +160,29 @@
         }
     }
 
-    /**
-     * Just save off the PigJobConf for later use.
-     */
-    public void configure(JobConf job) {
-        this.job = job;
+    public void configure(JobConf jobConf) {
+        job = jobConf;
+        
+        // Set up the pigContext
+        try {
+            pigContext = 
+                (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+        } catch (IOException ioe) {
+            log.fatal("Failed to deserialize PigContext with: " + ioe);
+            throw new RuntimeException(ioe);
+        }
+        pigContext.setJobConf(job);
+        
+        // Get properties from the JobConf and save it in the 
+        // <code>properties</code> so that it can be used in the Eval pipeline 
+        properties.setProperty("pig.output.dir", 
+                               jobConf.get("pig.output.dir", ""));
+        properties.setProperty("pig.streaming.log.dir", 
+                               jobConf.get("pig.streaming.log.dir", "_logs"));
+        properties.setProperty("pig.streaming.task.id", 
+                               jobConf.get("mapred.task.id"));
+        properties.setProperty("pig.streaming.task.output.dir", 
+                               jobConf.getOutputPath().toString());
     }
 
     /**
@@ -191,9 +211,9 @@
         return taskidParts[taskidParts.length - 2];
     }
     
-    private void setupMapPipe(Reporter reporter) throws IOException {
+    private void setupMapPipe(Properties properties, Reporter reporter) 
+    throws IOException {
         PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
-        pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
         index = split.getIndex();
         EvalSpec evalSpec = split.getEvalSpec();
         
@@ -216,11 +236,13 @@
             }else{
                 oc = getSplitCollector(splitSpec);    
             }
-            evalPipe = evalSpec.setupPipe(new MapDataOutputCollector());
+            evalPipe = evalSpec.setupPipe(properties, 
+                                          new MapDataOutputCollector());
         } else {
             group = groupSpec;
-            DataCollector groupInput = group.setupPipe(new MapDataOutputCollector());
-            evalPipe = evalSpec.setupPipe(groupInput);
+            DataCollector groupInput = 
+                group.setupPipe(properties, new MapDataOutputCollector());
+            evalPipe = evalSpec.setupPipe(properties, groupInput);
         }
         
     }
@@ -248,8 +270,7 @@
         };
     }
     
-    private void setupReducePipe() throws IOException {
-        pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+    private void setupReducePipe(Properties properties) throws IOException {
         EvalSpec evalSpec = (EvalSpec)ObjectSerializer.deserialize(job.get("pig.reduceFunc", ""));
         
         if (evalSpec == null) 
@@ -271,7 +292,8 @@
             oc = getSplitCollector(splitSpec);
         }
     
-        evalPipe = evalSpec.setupPipe(new ReduceDataOutputCollector());
+        evalPipe = evalSpec.setupPipe(properties, 
+                                      new ReduceDataOutputCollector());
         
         inputCount = ((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size();
 

Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java Thu Mar 27 09:54:10 2008
@@ -79,7 +79,8 @@
                 }
             };
             
-            DataCollector inputToSpec = specs.get(i).setupPipe(outputFromSpec);
+            DataCollector inputToSpec = specs.get(i).setupPipe(null, 
+                                                               outputFromSpec);
 
             Tuple t;            
             while ((t = (Tuple) ((PhysicalOperator)opTable.get(inputs[i])).getNext()) != null) {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java Thu Mar 27 09:54:10 2008
@@ -74,7 +74,7 @@
         if (buf==null)
             buf = new DataBuffer();
         if (evalPipeline == null)
-            evalPipeline = spec.setupPipe(buf);
+            evalPipeline = spec.setupPipe(null, buf);
             
         inputDrained = false;
         

Modified: incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java Thu Mar 27 09:54:10 2008
@@ -39,6 +39,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.Main;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -50,8 +51,11 @@
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
+import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
 import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.WrappedIOException;
 
@@ -91,15 +95,30 @@
    
     private String jobName = JOB_NAME_PREFIX;    // can be overwritten by users
   
+    // Pig Script Output
+    private String jobOutputFile = "";
+    
+    // JobConf of the currently executing Map-Reduce job
+    JobConf jobConf;
+    
     /**
      * a table mapping function names to function specs.
      */
     private Map<String, String> definedFunctions = new HashMap<String, String>();
     
+    /**
+     * a table mapping names to streaming commands.
+     */
+    private Map<String, StreamingCommand> definedCommands = 
+        new HashMap<String, StreamingCommand>();
+    
     private static ArrayList<String> packageImportList = new ArrayList<String>();
 
     public boolean                       debug       = true;
     
+    // List of paths skipped for automatic shipping
+    List<String> skippedShipPaths = new ArrayList<String>();
+    
     public PigContext() {
         this(ExecType.MAPREDUCE);
     }
@@ -118,6 +137,14 @@
         }
         
         executionEngine = null;
+        
+        // Add the default paths to be skipped for auto-shipping of commands
+        skippedShipPaths.add("/bin");
+        skippedShipPaths.add("/usr/bin");
+        skippedShipPaths.add("/usr/local/bin");
+        skippedShipPaths.add("/sbin");
+        skippedShipPaths.add("/usr/sbin");
+        skippedShipPaths.add("/usr/local/sbin");
     }
 
     static{
@@ -341,6 +368,22 @@
     }
 
     /**
+     * Defines an alias for the given streaming command.
+     * 
+     * This is useful for complicated streaming command specs.
+     * 
+     * @param alias - the new command alias to define.
+     * @param command - the command 
+     */
+    public void registerStreamCmd(String alias, StreamingCommand command) {
+        if (command == null) {
+            definedCommands.remove(alias);
+        } else {
+            definedCommands.put(alias, command);
+        }
+    }
+
+    /**
      * Returns the type of execution currently in effect.
      * 
      * @return
@@ -489,7 +532,103 @@
             return instantiateFuncFromSpec(alias);
     }
 
+    /**
+     * Get the {@link StreamingCommand} for the given alias.
+     * 
+     * @param alias the alias for the <code>StreamingCommand</code>
+     * @return <code>StreamingCommand</code> for the alias
+     */
+    public StreamingCommand getCommandForAlias(String alias) {
+        return definedCommands.get(alias);
+    }
+    
     public void setExecType(ExecType execType) {
         this.execType = execType;
+    }
+    
+    /**
+     * Create a new {@link ExecutableManager} depending on the ExecType.
+     * 
+     * @return a new {@link ExecutableManager} depending on the ExecType
+     * @throws ExecException
+     */
+    public ExecutableManager createExecutableManager() throws ExecException {
+        ExecutableManager executableManager = null;
+
+        switch (execType) {
+            case LOCAL:
+            {
+                executableManager = new ExecutableManager();
+            }
+            break;
+            case MAPREDUCE: 
+            {
+                executableManager = new HadoopExecutableManager();
+            }
+            break;
+            default:
+            {
+                throw new ExecException("Unkown execType: " + execType);
+            }
+        }
+        
+        return executableManager;
+    }
+
+    /**
+     * Get the output file for the current Pig Script.
+     * 
+     * @return the output file for the current Pig Script
+     */
+    public String getJobOutputFile() {
+        return jobOutputFile;
+    }
+
+    /**
+     * Set the output file for the current Pig Script.
+     * 
+     * @param jobOutputFile the output file for the current Pig Script
+     */
+    public void setJobOutputFile(String jobOutputFile) {
+        this.jobOutputFile = jobOutputFile;
+    }
+
+    /**
+     * Get the <code>JobConf</code> of the current Map-Reduce job.
+     * 
+     * @return the <code>JobConf</code> of the current Map-Reduce job
+     */
+    public JobConf getJobConf() {
+        return jobConf;
+    }
+
+    /**
+     * Set the <code>JobConf</code> of the current Map-Reduce job.
+     * 
+     * @param jobConf the <code>JobConf</code> of the current Map-Reduce job
+     */
+    public void setJobConf(JobConf jobConf) {
+        this.jobConf = jobConf;
+    }
+    
+    /**
+     * Add a path to be skipped while automatically shipping binaries for 
+     * streaming.
+     *  
+     * @param path path to be skipped
+     */
+    public void addPathToSkip(String path) {
+        skippedShipPaths.add(path);
+    }
+    
+    /**
+     * Get paths which are to skipped while automatically shipping binaries for
+     * streaming.
+     * 
+     * @return paths which are to skipped while automatically shipping binaries 
+     *         for streaming
+     */
+    public List<String> getPathsToSkip() {
+        return skippedShipPaths;
     }
 }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java Thu Mar 27 09:54:10 2008
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
@@ -68,7 +69,8 @@
     }
     
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return new DataCollector(endOfPipe){
             @Override
             public void add(Datum d) {

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java Thu Mar 27 09:54:10 2008
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.collector.DataCollector;
@@ -43,9 +44,10 @@
     }
         
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe){
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe){
         for (int i=specs.size()-1; i>=0; i--){
-            endOfPipe = specs.get(i).setupDefaultPipe(endOfPipe);
+            endOfPipe = specs.get(i).setupDefaultPipe(properties, endOfPipe);
         }
         return endOfPipe;
     }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Thu Mar 27 09:54:10 2008
@@ -21,6 +21,7 @@
 import java.io.Serializable;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.data.Datum;
@@ -45,13 +46,15 @@
     private String comparatorFuncName;
     private transient Comparator<Tuple> comparator;
     
+    protected Properties properties = new Properties();
+    
     /*
      * Keep a precomputed pipeline ready if we do simple evals
      * No separate code path for simple evals as earlier 
      */
     private void init(){
         simpleEvalOutput = new DataBuffer();
-        simpleEvalInput = setupPipe(simpleEvalOutput);
+        simpleEvalInput = setupPipe(properties, simpleEvalOutput);
     }
     
     public class UserComparator implements Comparator<Tuple> {
@@ -88,20 +91,24 @@
     /**
      * set up a default data processing pipe for processing by this spec
      * This pipe does not include unflattening/flattening at the end
+     * @param properties properties for the pipe
      * @param endOfPipe The collector where output is desired
      * @return The collector where input tuples should be put
      */
-    protected abstract DataCollector setupDefaultPipe(DataCollector endOfPipe);
+    protected abstract DataCollector setupDefaultPipe(Properties properties,
+                                                      DataCollector endOfPipe);
     
     
     /**
      * set up a data processing pipe with flattening/unflattening at the end
      * based on the isFlattened field
      * 
+     * @param properties properties for the <code>EvalSpec</code>
      * @param endOfPipe where the output is desired 
      * @return The collector where input tuples should be put
      */
-    public DataCollector setupPipe(DataCollector endOfPipe){
+    public DataCollector setupPipe(Properties properties, 
+                                   DataCollector endOfPipe){
         /*
          * By default tuples flow through the eval pipeline in a flattened fashion
          * Thus if flatten is true, we use the default setup pipe method, otherwise we add 
@@ -110,10 +117,10 @@
     
         if (isFlattened){
             FlattenCollector fc = new FlattenCollector(endOfPipe);
-            return setupDefaultPipe(fc);
+            return setupDefaultPipe(properties, fc);
         }else{
             UnflattenCollector uc = new UnflattenCollector(endOfPipe);
-            return setupDefaultPipe(uc);
+            return setupDefaultPipe(properties, uc);
         }
     }
     
@@ -245,6 +252,15 @@
         this.inner = inner;
     }
 
+    /**
+     * Get properties specific to a given <code>EvalSpec</code>.
+     * 
+     * @return properties specific to a given <code>EvalSpec</code>
+     */
+    public Properties getProperties() {
+        return properties;
+    }
+    
     public abstract void visit(EvalSpecVisitor v);
     
 }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java Thu Mar 27 09:54:10 2008
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
@@ -48,7 +49,8 @@
     }
     
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return new DataCollector(endOfPipe) {
 
             @Override

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java Thu Mar 27 09:54:10 2008
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
+import java.util.Properties;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.Algebraic;
@@ -81,7 +82,8 @@
     }
 
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return new DataCollector(endOfPipe){
             private Datum getPlaceHolderForFuncOutput(){
                 Type returnType = func.getReturnType();

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java Thu Mar 27 09:54:10 2008
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -63,7 +64,8 @@
     }
     
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return new DataCollector(endOfPipe){
             LinkedList<CrossProductItem> pendingCrossProducts = new LinkedList<CrossProductItem>();
             
@@ -154,7 +156,7 @@
                     continue;
                 toBeCrossed[i] = new DatumBag();
 
-                specs.get(i).setupPipe(toBeCrossed[i]).add(cpiInput);
+                specs.get(i).setupPipe(properties, toBeCrossed[i]).add(cpiInput);
             }
         }
         
@@ -258,7 +260,7 @@
         }
         
         public void exec(){
-            specs.get(driver).setupPipe(this).add(cpiInput);
+            specs.get(driver).setupPipe(properties, this).add(cpiInput);
             //log.error(Thread.currentThread().getName() + ": Executing driver on " + cpiInput);
             successor.markStale(false);
         }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java Thu Mar 27 09:54:10 2008
@@ -18,6 +18,8 @@
 package org.apache.pig.impl.eval;
 
 
+import java.util.Properties;
+
 import org.apache.pig.data.Datum;
 import org.apache.pig.impl.eval.collector.DataCollector;
 
@@ -25,7 +27,8 @@
 public abstract class SimpleEvalSpec extends EvalSpec {
 
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return new DataCollector(endOfPipe){
             @Override
             public void add(Datum d) {

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java Thu Mar 27 09:54:10 2008
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -56,7 +57,8 @@
     }
 
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return new DataCollector(endOfPipe){
             
             @Override

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java Thu Mar 27 09:54:10 2008
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -38,7 +39,8 @@
     }
     
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return endOfPipe;
     }
     

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Thu Mar 27 09:54:10 2008
@@ -19,16 +19,16 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.impl.streaming.PigExecutableManager;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.data.Datum;
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -38,21 +38,35 @@
     private static final Log LOG = 
         LogFactory.getLog(StreamSpec.class.getName());
 
-    private String streamingCommand;                       // Actual command to be run
-    private String deserializer;                           // LoadFunc to be used
-    private String serializer;                             // StoreFunc to be used
-
-    public StreamSpec(String streamingCommand) {
-        this.streamingCommand = streamingCommand;
-        this.deserializer = PigStorage.class.getName();
-        this.serializer = PigStorage.class.getName();
-    }
+    private String executableManager;               // ExecutableManager to use
+    private StreamingCommand command;               // Actual command to be run
 
-    public StreamSpec(String streamingCommand, 
-                      String deserializer, String serializer) {
-        this.streamingCommand = streamingCommand;
-        this.deserializer = deserializer;
-        this.serializer = serializer;
+    public StreamSpec(ExecutableManager executableManager, 
+                      StreamingCommand command) {
+        this.executableManager = executableManager.getClass().getName();
+        this.command = command;
+
+        // Setup streaming-specific properties
+        if (command.getShipFiles()) {
+            parseShipCacheSpecs(command.getShipSpecs(), 
+                                properties, "pig.streaming.ship.files");
+        }
+        parseShipCacheSpecs(command.getCacheSpecs(), 
+                            properties, "pig.streaming.cache.files");
+    }
+    
+    private static void parseShipCacheSpecs(List<String> specs, 
+            Properties properties, String property) {
+        // Setup streaming-specific properties
+        StringBuffer sb = new StringBuffer();
+        Iterator<String> i = specs.iterator();
+        while (i.hasNext()) {
+            sb.append(i.next());
+            if (i.hasNext()) {
+                sb.append(", ");
+            }
+        }
+        properties.setProperty(property, sb.toString());        
     }
     
     @Override
@@ -66,15 +80,11 @@
         return null;
     }
 
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
-        return new StreamDataCollector(streamingCommand,
-                                       (deserializer == null) ? new PigStorage() :
-                                         (LoadFunc)PigContext.instantiateFuncFromSpec(
-                                                                        deserializer),            
-                                       (serializer == null) ? new PigStorage() :
-                                         (StoreFunc)PigContext.instantiateFuncFromSpec(
-                                                                        serializer),
-                                      endOfPipe);
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
+        return new StreamDataCollector(properties,
+                                       (ExecutableManager)PigContext.instantiateFuncFromSpec(executableManager), 
+                                       command, endOfPipe);
     }
 
     public void visit(EvalSpecVisitor v) {
@@ -82,16 +92,18 @@
     }
 
     /**
-     * A simple {@link DataCollector} which wraps a {@link PigExecutableManager}
+     * A simple {@link DataCollector} which wraps a {@link ExecutableManager}
      * and lets it handle the input and the output to the managed executable.
      */
     private static class StreamDataCollector extends DataCollector {
-        PigExecutableManager executable;                          //Executable manager
+        ExecutableManager executableManager;            //Executable manager
         
-        public StreamDataCollector(String streamingCommand,
-                                   LoadFunc deserializer, StoreFunc serializer,
+        public StreamDataCollector(Properties properties,
+                                   ExecutableManager executableManager,
+                                   StreamingCommand command,
                                    DataCollector endOfPipe) {
             super(endOfPipe);
+            this.executableManager = executableManager;
 
             DataCollector successor = 
                 new DataCollector(endOfPipe) {
@@ -102,33 +114,32 @@
             };
 
             try {
-                // Create the PigExecutableManager
-                executable = new PigExecutableManager(streamingCommand, 
-                                                      deserializer, serializer, 
-                                                      successor);
-                
-                executable.configure();
+                // Setup the ExecutableManager
+                this.executableManager.configure(properties, command, successor);
                 
                 // Start the executable
-                executable.run();
+                this.executableManager.run();
             } catch (Exception e) {
-                LOG.fatal("Failed to create/start PigExecutableManager with: " + e);
+                LOG.fatal("Failed to create/start PigExecutableManager with: " + 
+                          e);
+                e.printStackTrace();
                 throw new RuntimeException(e);
             }
         }
 
         public void add(Datum d) {
             try {
-                executable.add(d);
+                executableManager.add(d);
             } catch (IOException ioe) {
-                LOG.fatal("executable.add(" + d + ") failed with: " + ioe);
+                LOG.fatal("ExecutableManager.add(" + d + ") failed with: " + 
+                          ioe);
                 throw new RuntimeException(ioe);
             }
         }
 
         protected void finish() {
             try {
-                executable.close();
+                executableManager.close();
             }
             catch (Exception e) {
                 LOG.fatal("Failed to close PigExecutableManager with: " + e);

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java Thu Mar 27 09:54:10 2008
@@ -20,6 +20,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.TimestampedTuple;
@@ -40,7 +41,8 @@
     }
     
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return new DataCollector(endOfPipe) {
 
             @Override

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java Thu Mar 27 09:54:10 2008
@@ -37,7 +37,8 @@
     }
     
     @Override
-    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    protected DataCollector setupDefaultPipe(Properties properties,
+                                             DataCollector endOfPipe) {
         return new DataCollector(endOfPipe) {
 
             @Override

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Mar 27 09:54:10 2008
@@ -44,6 +44,8 @@
 import org.apache.pig.builtin.*;
 import org.apache.pig.impl.builtin.*;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 
 public class QueryParser {
 	private PigContext pigContext;
@@ -241,9 +243,121 @@
 	 void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, Cond cond, int index){
 		splitOp.addCond(cond);
 		LOSplitOutput splitOut = new LOSplitOutput(opTable, scope, getNextId(), lp.getRoot(), index);
-		aliases.put(alias, new LogicalPlan(splitOut.getOperatorKey(), opTable, pigContext));
+	    aliases.put(alias, new LogicalPlan(splitOut.getOperatorKey(), opTable, pigContext));
 	 }
 
+     // Check and set files to be automatically shipped for the given StreamingCommand
+     // Auto-shipping rules:
+     // 1. If the command begins with either perl or python assume that the 
+     //    binary is the first non-quoted string it encounters that does not 
+     //    start with dash - subject to restrictions in (2).
+     // 2. Otherwise, attempt to ship the first string from the command line as 
+     //    long as it does not come from /bin, /user/bin, /user/local/bin. 
+     //    It will determine that by scanning the path if an absolute path is 
+     //    provided or by executing "which". The paths can be made configurable 
+     //    via "set stream.skippath <paths>" option.
+     private static final String PERL = "perl";
+     private static final String PYTHON = "python";
+     private void checkAutoShipSpecs(StreamingCommand command, String[] argv) {
+     	String arg0 = argv[0];
+     	// Check if command is perl or python ...
+        if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) {
+            for (int i=1; i < argv.length; ++i) {
+            	if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) {
+            		command.addPathToShip(argv[i]);
+            		command.setExecutable(argv[i]);
+            		break;
+            	}
+            }
+        } else {
+        	// Ship the first argument if it can be ...
+        	boolean absPath = arg0.startsWith("/");
+        	String filePath =  (absPath) ? arg0 : which(arg0);
+        	if (filePath.length() > 0 && checkAndShip(command, filePath)) {
+        		// Make it relative to task's cwd
+        		String runtimeExecutablePath = (absPath) ? ("." + filePath) : filePath; 
+        		argv[0] = runtimeExecutablePath;
+        		command.setCommandArgs(argv);
+        		command.setExecutable(runtimeExecutablePath);
+        		
+        		// Ship the file 
+                command.addPathToShip(filePath);
+        	}
+        }
+     }
+     
+     private boolean isQuotedString(String s) {
+     	return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\'');
+     }
+     
+     private boolean checkAndShip(StreamingCommand command, String file) {
+     	// Check if file is in the paths to be skipped 
+     	for (String skipPath : pigContext.getPathsToSkip()) {
+     		if (file.startsWith(skipPath)) {
+     			return false;
+     		}
+     	}
+        return true;
+     }
+
+
+     private static String which(String file) {
+        try {
+        	ProcessBuilder processBuilder = new ProcessBuilder(new String[] {"which", file});
+            Process process = processBuilder.start();
+    
+            BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            String fullPath = stdout.readLine();
+
+            return (process.waitFor() == 0) ? fullPath : "";
+        } catch (Exception e) {}
+        return "";
+     }
+               
+     private static final char SINGLE_QUOTE = '\'';
+     private static final char DOUBLE_QUOTE = '"';
+     private static String[] splitArgs(String command) throws ParseException {
+        List<String> argv = new ArrayList<String>();
+
+        int beginIndex = 0;
+        
+        while (beginIndex < command.length()) {
+            // Skip spaces
+            while (Character.isWhitespace(command.charAt(beginIndex))) {
+                ++beginIndex;
+            }
+            
+            char delim = ' ';
+            char charAtIndex = command.charAt(beginIndex);
+            if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+                delim = charAtIndex;
+            }
+            
+            int endIndex = command.indexOf(delim, beginIndex+1);
+            if (endIndex == -1) {
+                if (Character.isWhitespace(delim)) {
+                    // Reached end of command-line
+                    argv.add(command.substring(beginIndex));
+                    break;
+                } else {
+                    // Didn't find the ending quote/double-quote
+                    throw new ParseException("Illegal command: " + command);
+                }
+            }
+            
+            if (Character.isWhitespace(delim)) {
+                // Do not consume the space
+                argv.add(command.substring(beginIndex, endIndex));
+            } else {
+                argv.add(command.substring(beginIndex, endIndex+1));
+            }
+           
+            beginIndex = endIndex + 1;
+        }
+        
+        return argv.toArray(new String[argv.size()]);
+    }
+
 }
 
 
@@ -387,8 +501,15 @@
 TOKEN : { <EVAL : "eval"> }
 TOKEN : { <STREAM : "stream"> }
 TOKEN : { <THROUGH : "through"> }
-TOKEN : { <BACKTICK : "`"> }
 TOKEN : { <STORE : "store"> }
+TOKEN : { <SHIP: "ship"> }
+TOKEN : { <CACHE: "cache"> }
+TOKEN : { <INPUT: "input"> }
+TOKEN : { <OUTPUT: "output"> }
+TOKEN : { <ERROR: "stderr"> }
+TOKEN : { <STDIN: "stdin"> }
+TOKEN : { <STDOUT: "stdout"> }
+TOKEN : { <LIMIT: "limit"> }
 
 TOKEN:
 {
@@ -1254,43 +1375,104 @@
 	}
 }
 
-LogicalOperator StreamClause(): {LogicalOperator input; String streamingCommand;}
+LogicalOperator StreamClause(): 
+{
+	LogicalOperator input; 
+	StreamingCommand command;
+}
 {
 	input = NestedExpr()	
 	
-	<THROUGH> streamingCommand = StreamingCommand()
+	<THROUGH> command = Command()
 	{
-		return new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), new StreamSpec(streamingCommand));
+		return new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), 
+		                  new StreamSpec(pigContext.createExecutableManager(),
+		                                 command)
+		                 );
 	}
 }
 
-String StreamingCommand(): {Token t;}
+StreamingCommand Command(): {Token t; StreamingCommand command;}
 {
 	t = <EXECCOMMAND>
 	{
-		return unquote(t.image);
+		String[] argv = splitArgs(unquote(t.image));
+		command = new StreamingCommand(argv);
+        checkAutoShipSpecs(command, argv);
+		return command;
+	}
+	|
+	t = <IDENTIFIER>
+	{
+		command = pigContext.getCommandForAlias(t.image);
+		if (command == null) {
+			throw new ParseException("Undefined command-alias: " + t.image + 
+			                         " used as stream operator");
+		}
+
+		return command;
 	}
 }
 
-LogicalOperator DefineClause() : {Token t; Token t1; String functionName, functionArgs;}
+LogicalOperator DefineClause() : {Token t; Token cmd; String functionName, functionArgs;}
 {
-	t = <IDENTIFIER>
-	(
+    t = <IDENTIFIER>
+    (
+    ( 
+        cmd = <EXECCOMMAND>
+        {
+            StreamingCommand command = 
+               new StreamingCommand(splitArgs(unquote(cmd.image)));
+            String[] paths;
+            StreamingCommand.HandleSpec[] handleSpecs;
+        }
+        (
+            <SHIP> "(" paths = PathList() ")" 
+            {
+                if (paths.length == 0) {
+                	command.setShipFiles(false);
+                } else {
+                    for (String path : paths) {
+                        command.addPathToShip(path);
+                    }
+                }
+            }
+            |
+            <CACHE> "(" paths = PathList() ")"
+            {
+                for (String path : paths) {
+                    command.addPathToCache(path);
+                }
+            }
+            |
+            <INPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.INPUT) ")"
+            |
+            <OUTPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.OUTPUT) ")"
+            |
+            <ERROR> "(" ErrorSpec(command, t.image) ")"
+        )*
+        {
+            pigContext.registerStreamCmd(t.image, command); 
+        }
+    )
+    |
+    (
         functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
         {
-        	pigContext.registerFunction(t.image, 
-        	                            (functionName + "(" + functionArgs + ")"));
+            pigContext.registerFunction(t.image, 
+                                        (functionName + "(" + functionArgs + ")"));
         }
     )
+    )
     {
-    	// Return the dummy LODefine
-    	return new LODefine(opTable, scope, getNextId());
+        // Return the dummy LODefine
+        return new LODefine(opTable, scope, getNextId());
     }
 }
 
 LogicalOperator StoreClause() : {LogicalOperator lo; Token t; String fileName; String functionSpec = null; String functionName, functionArgs;}
 {   
-	t = <IDENTIFIER> <INTO> fileName = FileName()
+    t = <IDENTIFIER> <INTO> fileName = FileName()
     (
         <USING> functionName = QualifiedFunction()
         {functionSpec = functionName;} 
@@ -1305,12 +1487,79 @@
         }
          
         LogicalPlan readFrom = aliases.get(t.image);
-       
+        String jobOutputFile = massageFilename(fileName, pigContext, false);
         lo = new LOStore(opTable, scope, getNextId(), readFrom.getRoot(),
-                         new FileSpec(massageFilename(fileName, pigContext, false), 
-                                      functionSpec),
+                         new FileSpec(jobOutputFile, functionSpec),
                          false);
-            
+        pigContext.setJobOutputFile(jobOutputFile);
+        
         return lo;
     } 
+}
+
+String[] PathList() : {Token t; List<String> pathList = new ArrayList<String>();}
+{
+    (
+    (
+    t = <QUOTEDSTRING> {pathList.add(unquote(t.image));}
+    ( "," t = <QUOTEDSTRING> {pathList.add(unquote(t.image));} )*
+    )
+    | {}
+    )
+    {return pathList.toArray(new String[pathList.size()]);}
+}
+
+void InputOutputSpec(StreamingCommand command, StreamingCommand.Handle handle):
+{
+    String stream, deserializer;
+    StreamingCommand.HandleSpec[] handleSpecs;
+    String functionName, functionArgs;
+} 
+{
+    
+    stream = CommandStream() <USING> functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
+    {
+        deserializer = functionName + "(" + functionArgs + ")";
+        command.addHandleSpec(handle, 
+                              new HandleSpec(stream, deserializer)
+                             );
+    }
+    (
+        "," 
+        stream = CommandStream() <USING> functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
+        {
+        	deserializer = functionName + "(" + functionArgs + ")";
+            command.addHandleSpec(handle, 
+                                  new HandleSpec(stream, deserializer)
+                                 );
+        }
+    )* 
+}
+
+String CommandStream(): {Token t;}
+{
+    t = <STDIN>
+    {return "stdin";}
+    |
+    t = <STDOUT>
+    {return "stdout";}
+    |
+    t = <QUOTEDSTRING>
+    {return unquote(t.image);}
+}
+
+void ErrorSpec(StreamingCommand command, String alias): {Token t1, t2; int limit = StreamingCommand.MAX_TASKS;}
+{
+	(
+	t1 = <QUOTEDSTRING>
+	(<LIMIT> t2 = <NUMBER> {limit = Integer.parseInt(t2.image);})?
+	{
+		command.setLogDir(unquote(t1.image));
+		command.setLogFilesLimit(limit);
+	}
+	)
+	|
+	{
+        command.setLogDir(alias);
+	}
 }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java Thu Mar 27 09:54:10 2008
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.streaming;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.eval.collector.DataCollector;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-
-/**
- * {@link PigExecutableManager} manages an external executable which processes data
- * in a Pig query.
- * 
- * The <code>PigExecutableManager</code> is responsible for startup/teardown of the 
- * external process and also for managing it.
- * It feeds input records to the executable via it's <code>stdin</code>, 
- * collects the output records from the <code>stdout</code> and also diagnostic 
- * information from the <code>stdout</code>.
- */
-public class PigExecutableManager {
-	private static final Log LOG = 
-		LogFactory.getLog(PigExecutableManager.class.getName());
-
-	String command;                            // Streaming command to be run
-	String[] argv;                             // Parsed/split commands 
-
-	Process process;                           // Handle to the process
-	private static int SUCCESS = 0;
-	
-	protected DataOutputStream stdin;          // stdin of the process
-	StoreFunc serializer;                      // serializer to be used to
-	                                           // send data to the process
-	
-	ProcessOutputThread stdoutThread;          // thread to get process output
-	InputStream stdout;                        // stdout of the process
-	LoadFunc deserializer;                     // deserializer to be used to
-	                                           // interpret the process' output
-	
-	ProcessErrorThread stderrThread;           // thread to get process output
-	InputStream stderr;                        // stderr of the process
-	
-	DataCollector endOfPipe;
-
-	public PigExecutableManager(String command, 
-			                    LoadFunc deserializer, StoreFunc serializer, 
-			                    DataCollector endOfPipe) throws Exception {
-		this.command = command;
-		
-		this.argv = splitArgs(this.command);
-		if (LOG.isDebugEnabled()) {
-		    for (String cmd : argv) {
-		        LOG.debug("argv: " + cmd);
-		    }
-		}
-		
-		this.deserializer = deserializer;
-		this.serializer = serializer;
-		this.endOfPipe = endOfPipe;
-	}
-
-	private static final char SINGLE_QUOTE = '\'';
-	private static final char DOUBLE_QUOTE = '"';
-	private static String[] splitArgs(String command) throws Exception {
-		List<String> argv = new ArrayList<String>();
-
-		int beginIndex = 0;
-		
-		while (beginIndex < command.length()) {
-			// Skip spaces
-		    while (Character.isWhitespace(command.charAt(beginIndex))) {
-		        ++beginIndex;
-		    }
-			
-			char delim = ' ';
-			char charAtIndex = command.charAt(beginIndex);
-			if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
-				delim = charAtIndex;
-			}
-			
-			int endIndex = command.indexOf(delim, beginIndex+1);
-			if (endIndex == -1) {
-				if (Character.isWhitespace(delim)) {
-					// Reached end of command-line
-					argv.add(command.substring(beginIndex));
-					break;
-				} else {
-					// Didn't find the ending quote/double-quote
-					throw new ParseException("Illegal command: " + command);
-				}
-			}
-			
-			if (Character.isWhitespace(delim)) {
-				// Do not consume the space
-				argv.add(command.substring(beginIndex, endIndex));
-			} else {
-				// Do not consume the quotes
-				argv.add(command.substring(beginIndex+1, endIndex));
-			}
-			
-			beginIndex = endIndex + 1;
-		}
-		
-		return argv.toArray(new String[0]);
-	}
-
-	public void configure() {
-	}
-
-	public void close() throws Exception {
-	    // Close the stdin to let the process terminate
-		stdin.flush();
-		stdin.close();
-		stdin = null;
-		
-		// Wait for the process to exit and the stdout/stderr threads to complete
-		int exitCode = -1;
-		try {
-			exitCode = process.waitFor();
-			
-			if (stdoutThread != null) {
-				stdoutThread.join(0);
-			}
-			if (stderrThread != null) {
-				stderrThread.join(0);
-			}
-
-		} catch (InterruptedException ie) {}
-
-		// Clean up the process
-		process.destroy();
-		
-        LOG.debug("Process exited with: " + exitCode);
-        if (exitCode != SUCCESS) {
-            throw new ExecException(command + " failed with exit status: " + 
-                                       exitCode);
-        }
-	}
-
-	public void run() throws IOException {
-		// Run the executable
-		ProcessBuilder processBuilder = new ProcessBuilder(argv);
-		process = processBuilder.start();
-		LOG.debug("Started the process for command: " + command);
-
-		// Pick up the process' stdin/stdout/stderr streams
-		stdin = 
-			new DataOutputStream(new BufferedOutputStream(process.getOutputStream()));
-		stdout = 
-			new DataInputStream(new BufferedInputStream(process.getInputStream()));
-		stderr = 
-			new DataInputStream(new BufferedInputStream(process.getErrorStream()));
-
-		// Attach the serializer to the stdin of the process for sending tuples 
-		serializer.bindTo(stdin);
-		
-		// Attach the deserializer to the stdout of the process to get tuples
-		deserializer.bindTo("", new BufferedPositionedInputStream(stdout), 0, 
-				            Long.MAX_VALUE);
-		
-		// Start the threads to process the executable's stdout and stderr
-		stdoutThread = new ProcessOutputThread(deserializer);
-		stdoutThread.start();
-		stderrThread = new ProcessErrorThread();
-		stderrThread.start();
-	}
-
-	public void add(Datum d) throws IOException {
-		// Pass the serialized tuple to the executable
-		serializer.putNext((Tuple)d);
-		stdin.flush();
-	}
-
-	/**
-	 * Workhorse to process the stdout of the managed process.
-	 * 
-	 * The <code>PigExecutableManager</code>, by default, just pushes the received
-	 * <code>Datum</code> into eval-pipeline to be processed by the successor.
-	 * 
-	 * @param d <code>Datum</code> to process
-	 */
-	protected void processOutput(Datum d) {
-		endOfPipe.add(d);
-	}
-	
-	class ProcessOutputThread extends Thread {
-
-		LoadFunc deserializer;
-
-		ProcessOutputThread(LoadFunc deserializer) {
-			setDaemon(true);
-			this.deserializer = deserializer;
-		}
-
-		public void run() {
-			try {
-				// Read tuples from the executable and push them down the pipe
-				Tuple tuple = null;
-				while ((tuple = deserializer.getNext()) != null) {
-					processOutput(tuple);
-				}
-
-				if (stdout != null) {
-					stdout.close();
-					LOG.debug("ProcessOutputThread done");
-				}
-			} catch (Throwable th) {
-				LOG.warn(th);
-				try {
-					if (stdout != null) {
-						stdout.close();
-					}
-				} catch (IOException ioe) {
-					LOG.info(ioe);
-				}
-				throw new RuntimeException(th);
-			}
-		}
-	}
-
-	/**
-	 * Workhorse to process the stderr stream of the managed process.
-	 * 
-	 * By default <code>PigExecuatbleManager</code> just sends out the received
-	 * error message to the <code>stderr</code> of itself.
-	 * 
-	 * @param error error message from the managed process.
-	 */
-	protected void processError(String error) {
-		// Just send it out to our stderr
-		System.err.println(error);
-	}
-	
-	class ProcessErrorThread extends Thread {
-
-		public ProcessErrorThread() {
-			setDaemon(true);
-		}
-
-		public void run() {
-			try {
-				String error;
-				BufferedReader reader = 
-					new BufferedReader(new InputStreamReader(stderr));
-				while ((error = reader.readLine()) != null) {
-					processError(error);
-				}
-
-				if (stderr != null) {
-					stderr.close();
-					LOG.debug("ProcessErrorThread done");
-				}
-			} catch (Throwable th) {
-				LOG.warn(th);
-				try {
-					if (stderr != null) {
-						stderr.close();
-					}
-				} catch (IOException ioe) {
-					LOG.info(ioe);
-	                throw new RuntimeException(th);
-				}
-			}
-		}
-	}
-}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java Thu Mar 27 09:54:10 2008
@@ -90,11 +90,15 @@
      * @param funcs
      *            the functions that will be used in a job and whose jar files need to be included
      *            in the final merged jar file.
+     * @param files files which will be used in the job and need to be shipped           
      * @return the temporary path to the merged jar file.
      * @throws ClassNotFoundException
      * @throws IOException
      */
-    public static void createJar(OutputStream os, List<String> funcs, PigContext pigContext) throws ClassNotFoundException, IOException {
+    public static void createJar(OutputStream os, 
+                                 List<String> funcs, List<String> files,
+                                 PigContext pigContext) 
+    throws ClassNotFoundException, IOException {
         Vector<JarListEntry> jarList = new Vector<JarListEntry>();
         for(String toSend: pigPackagesToSend) {
             addContainingJar(jarList, PigMapReduce.class, toSend, pigContext);
@@ -126,6 +130,11 @@
             jarFile.putNextEntry(new ZipEntry("pigContext"));
             new ObjectOutputStream(jarFile).writeObject(pigContext);
         }
+        
+        for (String file : files) {
+            addStream(jarFile, file, new FileInputStream(file), contents);
+        }
+        
         jarFile.close();
     }
 

Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Thu Mar 27 09:54:10 2008
@@ -157,6 +157,9 @@
             //mPigServer.setJobName(unquote(value));
             mPigServer.setJobName(value);
         }
+        else if (key.equals("stream.skippath")) {
+            mPigServer.addPathToSkip(value);
+        }
         else
         {
             // other key-value pairs can go there

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu Mar 27 09:54:10 2008
@@ -18,6 +18,9 @@
 package org.apache.pig.test;
 
 import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -25,6 +28,9 @@
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.*;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+
 import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
 
 import junit.framework.TestCase;
@@ -36,7 +42,7 @@
 	private static final String simpleEchoStreamingCommand = 
 		"perl -ne 'chomp $_; print \"$_\n\"'";
 
-	private Tuple[] setupExpectedResults(String[] firstField, int[] secondField) {
+    private Tuple[] setupExpectedResults(String[] firstField, int[] secondField) {
 		Assert.assertEquals(firstField.length, secondField.length);
 		
 		Tuple[] expectedResults = new Tuple[firstField.length];
@@ -65,14 +71,14 @@
 			setupExpectedResults(expectedFirstFields, expectedSecondFields);
 
 		// Pig query to run
-		pigServer.registerQuery("INPUT = load 'file:" + input + "' using " + 
+		pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
 				                PigStorage.class.getName() + "(',');");
-		pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
-		pigServer.registerQuery("OUTPUT = stream FILTERED_DATA through `" +
+		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+		pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
 				                simpleEchoStreamingCommand + "`;");
 		
 		// Run the query and check the results
-		Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
 	}
 
 	@Test
@@ -91,15 +97,15 @@
 			setupExpectedResults(expectedFirstFields, expectedSecondFields);
 
 		// Pig query to run
-		pigServer.registerQuery("INPUT = load 'file:" + input + "' using " + 
+		pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
 				                PigStorage.class.getName() + "(',');");
-		pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
 		pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
 				                simpleEchoStreamingCommand + "` as (f0, f1);");
-		pigServer.registerQuery("OUTPUT = filter STREAMED_DATA by f1 > '6';");
+		pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > '6';");
 		
 		// Run the query and check the results
-		Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
 	}
 
 	@Test
@@ -118,17 +124,17 @@
 			setupExpectedResults(expectedFirstFields, expectedSecondFields);
 
 		// Pig query to run
-		pigServer.registerQuery("INPUT = load 'file:" + input + "' using " + 
+		pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
 				                PigStorage.class.getName() + "(',');");
-		pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
 		pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
 		pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
 				                "generate flatten($1);");
-		pigServer.registerQuery("OUTPUT = stream FLATTENED_GROUPED_DATA through `" +
+		pigServer.registerQuery("OP = stream FLATTENED_GROUPED_DATA through `" +
 				                simpleEchoStreamingCommand + "`;");
 		
 		// Run the query and check the results
-		Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
 	}
 
 	@Test
@@ -160,19 +166,206 @@
 			setupExpectedResults(expectedFirstFields, expectedSecondFields);
 
 		// Pig query to run
-		pigServer.registerQuery("INPUT = load 'file:" + input + "' using " + 
+		pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
 				                PigStorage.class.getName() + "(',');");
-		pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
-		pigServer.registerQuery("GROUPED_DATA = group INPUT by $0;");
+		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+		pigServer.registerQuery("GROUPED_DATA = group IP by $0;");
 		pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
-				                "  D = order INPUT BY $2, $3;" +
+				                "  D = order IP BY $2, $3;" +
                                 "  generate flatten(D);" +
                                 "};");
-		pigServer.registerQuery("OUTPUT = stream ORDERED_DATA through `" +
+		pigServer.registerQuery("OP = stream ORDERED_DATA through `" +
 				                simpleEchoStreamingCommand + "`;");
 		
 		// Run the query and check the results
-		Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
 	}
+
+    @Test
+    public void testInputShipSpecs() throws Exception {
+        PigServer pigServer = new PigServer(MAPREDUCE);
+
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
+                                                        "D,2", "A,5", "B,5", 
+                                                        "C,8", "A,8", "D,8", 
+                                                        "A,9"});
+
+        // Perl script 
+        String[] script = 
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          "  print STDOUT \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "}",
+                         };
+        File command = Util.createInputFile("script", "pl", script);
+        
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "A", "D", "A"};
+        int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults = 
+            setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define CMD `." + command + " foo` " +
+                "ship ('" + command + "') " +
+                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
+
+        pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
+                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
+        String output = "/pig/out";
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+        
+        // Cleanup
+        pigServer.deleteFile(output);
+    }
+
+    @Test
+	public void testOutputShipSpecs() throws Exception {
+	    PigServer pigServer = new PigServer(MAPREDUCE);
+
+	    File input = Util.createInputFile("tmp", "", 
+	                                      new String[] {"A,1", "B,2", "C,3", 
+	                                                    "D,2", "A,5", "B,5", 
+	                                                    "C,8", "A,8", "D,8", 
+	                                                    "A,9"});
+
+	    // Perl script 
+	    String[] script = 
+	        new String[] {
+	                      "#!/usr/bin/perl",
+                          "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+                          "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+                          "while (<STDIN>) {",
+                          "  print OUTFILE \"A,10\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "  print OUTFILE2 \"Secondary Output: $_\n\";",
+                          "}",
+	                     };
+	    File command = Util.createInputFile("script", "pl", script);
+	    
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "A", "A", "A", "A", "A"};
+        int[] expectedSecondFields = new int[] {10, 10, 10, 10, 10, 10};
+        Tuple[] expectedResults = 
+            setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define CMD `." + command + " foo bar` " +
+                "ship ('" + command + "') " +
+        		"output('foo' using " + PigStorage.class.getName() + "(','), " +
+        		"'bar' using " + PigStorage.class.getName() + "(',')) " +
+        		"stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
+                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+        
+        String output = "/pig/out";
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
 	
+        // Cleanup
+        pigServer.deleteFile(output);
+    }
+
+    @Test
+    public void testInputOutputSpecsWithAutoShip() throws Exception {
+        PigServer pigServer = new PigServer(MAPREDUCE);
+
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
+                                                        "D,2", "A,5", "B,5", 
+                                                        "C,8", "A,8", "D,8", 
+                                                        "A,9"});
+
+        // Perl script 
+        String[] script = 
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+                          "open(OUTFILE, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+                          "open(OUTFILE2, \">\", $ARGV[2]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          "  print OUTFILE \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "  print OUTFILE2 \"Secondary Output: $_\n\";",
+                          "}",
+                         };
+        File command = Util.createInputFile("script", "pl", script);
+        
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "A", "D", "A"};
+        int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults = 
+            setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define CMD `." + command + " foo bar foobar` " +
+                "ship ('" + command + "') " +
+                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+                "output('bar' using " + PigStorage.class.getName() + "(','), " +
+                "'foobar' using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
+                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+        
+        String output = "/pig/out";
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+        
+        // Cleanup
+        pigServer.deleteFile(output);
+    }
 }