You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/04 23:25:43 UTC

svn commit: r692253 [1/3] - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/ex...

Author: olga
Date: Thu Sep  4 14:25:41 2008
New Revision: 692253

URL: http://svn.apache.org/viewvc?rev=692253&view=rev
Log:
merged streaming into types branch

Added:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
    incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
    incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
    incubator/pig/branches/types/test/org/apache/pig/test/Util.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu Sep  4 14:25:41 2008
@@ -344,6 +344,7 @@
                     <exclude name="**/TestStoreOld.java" />
                     -->
                     <!-- Excluced because we don't want to run them -->
+                    <exclude name="**/PigExecTestCase.java" />
                     <exclude name="**/TypeCheckingTestUtil.java" />
                     <exclude name="**/TypeGraphPrinter.java" />
                     <exclude name="**/LogicalPlanTester.java" />

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu Sep  4 14:25:41 2008
@@ -65,6 +65,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.SplitIntroducer;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.WrappedIOException;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.logicalLayer.LODefine;
@@ -145,6 +146,16 @@
     }
     
     /**
+     * Add a path to be skipped while automatically shipping binaries for 
+     * streaming.
+     *  
+     * @param path path to be skipped
+     */
+    public void addPathToSkip(String path) {
+        pigContext.addPathToSkip(path);
+    }
+    
+    /**
      * Defines an alias for the given function spec. This
      * is useful for functions that require arguments to the 
      * constructor.
@@ -171,6 +182,16 @@
         pigContext.registerFunction(function, funcSpec);
     }
     
+    /**
+     * Defines an alias for the given streaming command.
+     * 
+     * @param commandAlias - the new command alias to define
+     * @param command - streaming command to be executed
+     */
+    public void registerStreamingCommand(String commandAlias, StreamingCommand command) {
+        pigContext.registerStreamCmd(commandAlias, command);
+    }
+
     private URL locateJarFromResources(String jarName) throws IOException {
         Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
         URL resourceLocation = null;
@@ -395,7 +416,7 @@
 
             stream.println("-----------------------------------------------");
             pigContext.getExecutionEngine().explain(pp, stream);
-        
+      
         } catch (Exception e) {
             throw WrappedIOException.wrap("Unable to explain alias " +
                 alias, e);

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Thu Sep  4 14:25:41 2008
@@ -210,6 +210,17 @@
                     POPostCombinerPackage newPack =
                         new POPostCombinerPackage(pack, bags);
                     mr.reducePlan.replace(pack, newPack);
+                    
+                    // the replace() above only changes
+                    // the plan and does not change "inputs" to 
+                    // operators
+                    // set up "inputs" for the operator after
+                    // package correctly
+                    List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
+                    packList.add(newPack);
+                    List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newPack);
+                    // there should be only one successor to package
+                    sucs.get(0).setInputs(packList);
                 } catch (Exception e) {
                     throw new VisitorException(e);
                 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Sep  4 14:25:41 2008
@@ -20,15 +20,21 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Hashtable;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -59,10 +65,12 @@
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * This is compiler class that takes an MROperPlan and converts
@@ -79,6 +87,8 @@
     
     private final Log log = LogFactory.getLog(getClass());
 
+    public static final String LOG_DIR = "_logs";
+
     /**
      * The map between MapReduceOpers and their corresponding Jobs
      */
@@ -188,7 +198,7 @@
      */
     private JobConf getJobConf(MapReduceOper mro, Configuration conf, PigContext pigContext) throws JobCreationException{
         JobConf jobConf = new JobConf(conf);
-        ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
+        ArrayList<Pair<FileSpec, Boolean>> inp = new ArrayList<Pair<FileSpec, Boolean>>();
         ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
         
         //Set the User Name for this job. This will be
@@ -201,8 +211,11 @@
         if(lds!=null && lds.size()>0){
             for (PhysicalOperator operator : lds) {
                 POLoad ld = (POLoad)operator;
+                
+                Pair<FileSpec, Boolean> p = new Pair<FileSpec, Boolean>(ld.getLFile(), ld.isSplittable());
                 //Store the inp filespecs
-                inp.add(ld.getLFile());
+                inp.add(p);
+                
                 //Store the target operators for tuples read
                 //from this input
                 List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
@@ -229,6 +242,12 @@
             jobConf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
             jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
     
+            // Setup the DistributedCache for this job
+            setupDistributedCache(pigContext, jobConf, pigContext.getProperties(), 
+                                  "pig.streaming.ship.files", true);
+            setupDistributedCache(pigContext, jobConf, pigContext.getProperties(), 
+                                  "pig.streaming.cache.files", false);
+
             jobConf.setInputFormat(PigInputFormat.class);
             jobConf.setOutputFormat(PigOutputFormat.class);
             
@@ -247,17 +266,39 @@
             FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
             FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
             jobConf.set("pig.storeFunc", outputFuncSpec.toString());
+
+            // Setup the logs directory for streaming jobs
+            jobConf.set("pig.streaming.log.dir", 
+                        new Path(new Path(outputPath), LOG_DIR).toString());
+            jobConf.set("pig.streaming.task.output.dir", outputPath);
+
             
             // store map key type
             // this is needed when the key is null to create
             // an appropriate NullableXXXWritable object
             jobConf.set("pig.map.keytype", ObjectSerializer.serialize(new byte[] { mro.mapKeyType }));
+
+            // set parent plan in all operators in map and reduce plans
+            // currently the parent plan is really used only when POStream is present in the plan
+            PhysicalPlan[] plans = new PhysicalPlan[] { mro.mapPlan, mro.reducePlan };
+            for (int i = 0; i < plans.length; i++) {
+                for (Iterator<PhysicalOperator> it = plans[i].iterator(); it.hasNext();) {
+                    PhysicalOperator op = it.next();
+                    op.setParentPlan(plans[i]);                
+                }    
+            }
             
             if(mro.reducePlan.isEmpty()){
                 //MapOnly Job
                 jobConf.setMapperClass(PigMapOnly.Map.class);
                 jobConf.setNumReduceTasks(0);
                 jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+                if(mro.isStreamInMap()) {
+                    // this is used in Map.close() to decide whether the
+                    // pipeline needs to be rerun one more time in the close()
+                    // The pipeline is rerun only if there was a stream
+                    jobConf.set("pig.stream.in.map", "true");
+                }
             }
             else{
                 //Map Reduce Job
@@ -277,14 +318,26 @@
                     jobConf.setNumReduceTasks(mro.requestedParallelism);
 
                 jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+                if(mro.isStreamInMap()) {
+                    // this is used in Map.close() to decide whether the
+                    // pipeline needs to be rerun one more time in the close()
+                    // The pipeline is rerun only if there was a stream
+                    jobConf.set("pig.stream.in.map", "true");
+                }
                 jobConf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
+                if(mro.isStreamInReduce()) {
+                    // this is used in Map.close() to decide whether the
+                    // pipeline needs to be rerun one more time in the close()
+                    // The pipeline is rerun only if there was a stream
+                    jobConf.set("pig.stream.in.reduce", "true");
+                }
                 jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
                 Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(pack.getKeyType()).getClass();
                 jobConf.setOutputKeyClass(keyClass);
                 selectComparator(mro, pack.getKeyType(), jobConf);
                 jobConf.setOutputValueClass(IndexedTuple.class);
             }
-            
+        
             if(mro.isGlobalSort()){
                 jobConf.set("pig.quantilesFile", mro.getQuantFile());
                 jobConf.setPartitionerClass(SortPartitioner.class);
@@ -299,7 +352,6 @@
                         ObjectSerializer.serialize(mro.getSortOrder()));
                 }
             }
-    
             return jobConf;
         }catch(Exception e){
             JobCreationException jce = new JobCreationException(e);
@@ -474,4 +526,56 @@
         }
     }
 
+    private static void setupDistributedCache(PigContext pigContext,
+                                              Configuration conf, 
+                                              Properties properties, String key, 
+                                              boolean shipToCluster) 
+    throws IOException {
+        // Turn on the symlink feature
+        DistributedCache.createSymlink(conf);
+
+        // Set up the DistributedCache for this job        
+        String fileNames = properties.getProperty(key);
+        if (fileNames != null) {
+            String[] paths = fileNames.split(",");
+            
+            for (String path : paths) {
+                path = path.trim();
+                if (path.length() != 0) {
+                    Path src = new Path(path);
+                    
+                    // Ensure that 'src' is a valid URI
+                    URI srcURI = null;
+                    try {
+                        srcURI = new URI(src.toString());
+                    } catch (URISyntaxException ue) {
+                        throw new IOException("Invalid cache specification, " +
+                        		              "file doesn't exist: " + src);
+                    }
+                    
+                    // Ship it to the cluster if necessary and add to the
+                    // DistributedCache
+                    if (shipToCluster) {
+                        Path dst = 
+                            new Path(FileLocalizer.getTemporaryPath(null, pigContext).toString());
+                        FileSystem fs = dst.getFileSystem(conf);
+                        fs.copyFromLocalFile(src, dst);
+                        
+                        // Construct the dst#srcName uri for DistributedCache
+                        URI dstURI = null;
+                        try {
+                            dstURI = new URI(dst.toString() + "#" + src.getName());
+                        } catch (URISyntaxException ue) {
+                            throw new IOException("Invalid ship specification, " +
+                                                  "file doesn't exist: " + dst);
+                        }
+                        DistributedCache.addCacheFile(dstURI, conf);
+                    } else {
+                        DistributedCache.addCacheFile(srcURI, conf);
+                    }
+                }
+            }
+        }
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Sep  4 14:25:41 2008
@@ -58,6 +58,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -268,7 +269,7 @@
     }
     
     private POLoad getLoad(){
-        POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)), true);
         ld.setPc(pigContext);
         return ld;
     }
@@ -301,6 +302,7 @@
      * @throws IOException
      */
     private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
+        
         if (compiledInputs.length == 1) {
             //For speed
             MapReduceOper mro = compiledInputs[0];
@@ -622,6 +624,16 @@
         }
     }
     
+    public void visitStream(POStream op) throws VisitorException{
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
     public void visitLimit(POLimit op) throws VisitorException{
         try{
         	

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Sep  4 14:25:41 2008
@@ -35,6 +35,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -128,11 +129,16 @@
             CombinerOptimizer co = new CombinerOptimizer(plan);
             co.visit();
         }
+        
+        // check whether stream operator is present
+        MRStreamHandler checker = new MRStreamHandler(plan);
+        checker.visit();
+        
         // figure out the type of the key for the map plan
         // this is needed when the key is null to create
         // an appropriate NullableXXXWritable object
         KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
-        kdv.visit();     
+        kdv.visit();
         return plan;
     }
  

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Sep  4 14:25:41 2008
@@ -66,6 +66,14 @@
     //is complete
     boolean reduceDone = false;
     
+    // Indicates that there is POStream in the 
+    // map plan
+    boolean streamInMap = false;
+    
+    // Indicates that there is POStream in the 
+    // reduce plan
+    boolean streamInReduce = false;
+    
     //Indicates if this job is an order by job
     boolean globalSort = false;
     
@@ -230,4 +238,32 @@
     public boolean[] getSortOrder() {
         return sortOrder;
     }
+
+    /**
+     * @return whether there is a POStream in the map plan
+     */
+    public boolean isStreamInMap() {
+        return streamInMap;
+    }
+
+    /**
+     * @param streamInMap the streamInMap to set
+     */
+    public void setStreamInMap(boolean streamInMap) {
+        this.streamInMap = streamInMap;
+    }
+
+    /**
+     * @return whether there is a POStream in the reduce plan
+     */
+    public boolean isStreamInReduce() {
+        return streamInReduce;
+    }
+
+    /**
+     * @param streamInReduce the streamInReduce to set
+     */
+    public void setStreamInReduce(boolean streamInReduce) {
+        this.streamInReduce = streamInReduce;
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Sep  4 14:25:41 2008
@@ -52,6 +52,7 @@
 import org.apache.pig.impl.io.ValidatingInputFileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
 
 public class PigInputFormat implements InputFormat<Text, TargetedTuple>,
         JobConfigurable {
@@ -175,7 +176,7 @@
      */
     public InputSplit[] getSplits(JobConf job, int numSplits)
             throws IOException {
-        ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer
+        ArrayList<Pair<FileSpec, Boolean>> inputs = (ArrayList<Pair<FileSpec, Boolean>>) ObjectSerializer
                 .deserialize(job.get("pig.inputs"));
         ArrayList<ArrayList<OperatorKey>> inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
                 .deserialize(job.get("pig.inpTargets"));
@@ -184,7 +185,7 @@
         
         ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
         for (int i = 0; i < inputs.size(); i++) {
-            Path path = new Path(inputs.get(i).getFileName());
+            Path path = new Path(inputs.get(i).first.getFileName());
             FileSystem fs = path.getFileSystem(job);
             // if the execution is against Mapred DFS, set
             // working dir to /user/<userid>
@@ -193,10 +194,14 @@
             
             DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
             ValidatingInputFileSpec spec;
-            if (inputs.get(i) instanceof ValidatingInputFileSpec) {
-                spec = (ValidatingInputFileSpec) inputs.get(i);
+            if (inputs.get(i).first instanceof ValidatingInputFileSpec) {
+                spec = (ValidatingInputFileSpec) inputs.get(i).first;
             } else {
-                spec = new ValidatingInputFileSpec(inputs.get(i), store);
+                spec = new ValidatingInputFileSpec(inputs.get(i).first, store);
+            }
+            boolean isSplittable = inputs.get(i).second;
+            if (isSplittable && (spec.getSlicer() instanceof PigSlicer)) {
+                ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
             }
             Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName());
             for (Slice split : pigs) {

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Thu Sep  4 14:25:41 2008
@@ -17,29 +17,31 @@
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.WrappedIOException;
 
 public abstract class PigMapBase extends MapReduceBase{
     private final Log log = LogFactory.getLog(getClass());
 
     protected byte keyType;
     
+    
     //Map Plan
     protected PhysicalPlan mp;
     
+    OutputCollector<WritableComparable, Writable> outputCollector;
+    
     // Reporter that will be used by operators
     // to transmit heartbeat
     ProgressableReporter pigReporter;
+
+    private boolean errorInMap = false;
+    
     
     /**
      * Will be called when all the tuples in the input
@@ -49,7 +51,31 @@
     public void close() throws IOException {
         super.close();
         PhysicalOperator.setReporter(null);
+        if(errorInMap) {
+            //error in map - returning
+            return;
+        }
+            
+        if(PigMapReduce.sJobConf.get("pig.stream.in.map", "false").equals("true")) {
+            // If there is a stream in the pipeline we could 
+            // potentially have more to process - so lets
+            // set the flag stating that all map input has been sent
+            // already and then lets run the pipeline one more time
+            // This will result in nothing happening in the case
+            // where there is no stream in the pipeline
+            mp.endOfAllInput = true;
+            List<PhysicalOperator> leaves = mp.getLeaves();
+            PhysicalOperator leaf = leaves.get(0);
+            try {
+                runPipeline(leaf);
+            } catch (ExecException e) {
+                 IOException ioe = new IOException("Error running pipeline in close() of map");
+                 ioe.initCause(e);
+                 throw ioe;
+            }
+        }
         mp = null;
+        
     }
 
     /**
@@ -98,6 +124,9 @@
             OutputCollector<WritableComparable, Writable> oc,
             Reporter reporter) throws IOException {
         
+        // cache the collector for use in runPipeline() which
+        // can be called from close()
+        this.outputCollector = oc;
         pigReporter.setRep(reporter);
         PhysicalOperator.setReporter(pigReporter);
         
@@ -113,35 +142,19 @@
         }
         
         for (OperatorKey targetKey : inpTuple.targetOps) {
+            
             PhysicalOperator target = mp.getOperator(targetKey);
             Tuple t = inpTuple.toTuple();
             target.attachInput(t);
         }
-        
         List<PhysicalOperator> leaves = mp.getLeaves();
         
         PhysicalOperator leaf = leaves.get(0);
         
+        
         try {
-            while(true){                
-                Result res = leaf.getNext(inpTuple);
-                if(res.returnStatus==POStatus.STATUS_OK){
-                    collect(oc,(Tuple)res.result);
-                    continue;
-                }
-                
-                if(res.returnStatus==POStatus.STATUS_EOP)
-                    return;
-                
-                if(res.returnStatus==POStatus.STATUS_NULL)
-                    continue;
-                
-                if(res.returnStatus==POStatus.STATUS_ERR){
-                    IOException ioe = new IOException("Received Error while " +
-                            "processing the map plan.");
-                    throw ioe;
-                }
-            }
+            runPipeline(leaf);
+            
         } catch (ExecException e) {
             IOException ioe = new IOException(e.getMessage());
             ioe.initCause(e.getCause());
@@ -149,6 +162,43 @@
         }
     }
 
+    private void runPipeline(PhysicalOperator leaf) throws IOException, ExecException {
+        Tuple dummyTuple = null;
+        while(true){
+            Result res = leaf.getNext(dummyTuple);
+            if(res.returnStatus==POStatus.STATUS_OK){
+                collect(outputCollector,(Tuple)res.result);
+                continue;
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_EOP) {
+                return;
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_NULL)
+                continue;
+            
+            if(res.returnStatus==POStatus.STATUS_ERR){
+                // remember that we had an issue so that in 
+                // close() we can do the right thing
+                errorInMap  = true;
+                // if there is an errmessage use it
+                String errMsg;
+                if(res.result != null) {
+                    errMsg = "Received Error while " +
+                    "processing the map plan: " + res.result;
+                } else {
+                    errMsg = "Received Error while " +
+                    "processing the map plan.";
+                }
+                    
+                IOException ioe = new IOException(errMsg);
+                throw ioe;
+            }
+        }
+        
+    }
+
     abstract public void collect(OutputCollector<WritableComparable, Writable> oc, Tuple tuple) throws ExecException, IOException;
 
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Sep  4 14:25:41 2008
@@ -100,6 +100,10 @@
         private POPackage pack;
         
         ProgressableReporter pigReporter;
+
+        private OutputCollector<WritableComparable, Writable> outputCollector;
+
+        private boolean errorInReduce = false;
         
         /**
          * Configures the Reduce plan, the POPackage operator
@@ -144,6 +148,9 @@
                 OutputCollector<WritableComparable, Writable> oc,
                 Reporter reporter) throws IOException {
             
+            // cache the collector for use in runPipeline()
+            // which could additionally be called from close()
+            this.outputCollector = oc;
             pigReporter.setRep(reporter);
             
             Object k = HDataType.convertToPigType(key);
@@ -160,31 +167,14 @@
                         return;
                     }
                     
+                    log.info("Attaching " + packRes + " to " + rp.getRoots());
                     rp.attachInput(packRes);
 
                     List<PhysicalOperator> leaves = rp.getLeaves();
                     
                     PhysicalOperator leaf = leaves.get(0);
-                    while(true){
-                        Result redRes = leaf.getNext(t);
-                        if(redRes.returnStatus==POStatus.STATUS_OK){
-                            oc.collect(null, (Tuple)redRes.result);
-                            continue;
-                        }
-                        
-                        if(redRes.returnStatus==POStatus.STATUS_EOP) {
-                            return;
-                        }
-                        
-                        if(redRes.returnStatus==POStatus.STATUS_NULL)
-                            continue;
-                        
-                        if(redRes.returnStatus==POStatus.STATUS_ERR){
-                            IOException ioe = new IOException("Received Error while " +
-                                    "processing the reduce plan.");
-                            throw ioe;
-                        }
-                    }
+                    runPipeline(leaf);
+                    
                 }
                 
                 if(res.returnStatus==POStatus.STATUS_NULL) {
@@ -204,6 +194,49 @@
             }
         }
         
+        /**
+         * @param leaf
+         * @throws ExecException 
+         * @throws IOException 
+         */
+        private void runPipeline(PhysicalOperator leaf) throws ExecException, IOException {
+            while(true)
+            {
+                Tuple dummyTuple = null;  
+                Result redRes = leaf.getNext(dummyTuple);
+                if(redRes.returnStatus==POStatus.STATUS_OK){
+                    outputCollector.collect(null, (Tuple)redRes.result);
+                    continue;
+                }
+                
+                if(redRes.returnStatus==POStatus.STATUS_EOP) {
+                    return;
+                }
+                
+                if(redRes.returnStatus==POStatus.STATUS_NULL)
+                    continue;
+                
+                if(redRes.returnStatus==POStatus.STATUS_ERR){
+                    // remember that we had an issue so that in 
+                    // close() we can do the right thing
+                    errorInReduce   = true;
+                    // if there is an errmessage use it
+                    String errMsg;
+                    if(redRes.result != null) {
+                        errMsg = "Received Error while " +
+                        "processing the reduce plan: " + redRes.result;
+                    } else {
+                        errMsg = "Received Error while " +
+                        "processing the reduce plan.";
+                    }
+                    
+                    IOException ioe = new IOException(errMsg);
+                    throw ioe;
+                }
+            }
+
+        
+        }
         
         /**
          * Will be called once all the intermediate keys and values are
@@ -215,6 +248,30 @@
             /*if(runnableReporter!=null)
                 runnableReporter.setDone(true);*/
             PhysicalOperator.setReporter(null);
+            
+            if(errorInReduce) {
+                // there was an error in reduce - just return
+                return;
+            }
+            
+            if(PigMapReduce.sJobConf.get("pig.stream.in.reduce", "false").equals("true")) {
+                // If there is a stream in the pipeline we could 
+                // potentially have more to process - so lets
+                // set the flag stating that all map input has been sent
+                // already and then lets run the pipeline one more time
+                // This will result in nothing happening in the case
+                // where there is no stream in the pipeline
+                rp.endOfAllInput = true;
+                List<PhysicalOperator> leaves = rp.getLeaves();
+                PhysicalOperator leaf = leaves.get(0);
+                try {
+                    runPipeline(leaf);
+                } catch (ExecException e) {
+                     IOException ioe = new IOException("Error running pipeline in close() of reduce");
+                     ioe.initCause(e);
+                     throw ioe;
+                }
+            }
         }
     }
     

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Sep  4 14:25:41 2008
@@ -728,6 +728,27 @@
     }
 
     @Override
+    public void visit(LOStream stream) throws VisitorException {
+        String scope = stream.getOperatorKey().scope;
+        POStream poStream = new POStream(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), stream.getExecutableManager(), 
+                stream.getStreamingCommand(), this.pc.getProperties());
+        currentPlan.add(poStream);
+        LogToPhyMap.put(stream, poStream);
+        
+        List<LogicalOperator> op = stream.getPlan().getPredecessors(stream);
+
+        PhysicalOperator from = LogToPhyMap.get(op.get(0));
+        try {
+            currentPlan.connect(from, poStream);
+        } catch (PlanException e) {
+            log.error("Invalid physical operators in the physical plan"
+                    + e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+
+    @Override
     public void visit(LOProject op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
         POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
@@ -971,7 +992,7 @@
         // This would be a root operator. We don't need to worry about finding
         // its predecessors
         POLoad load = new POLoad(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)));
+                .getNextNodeId(scope)), loLoad.isSplittable());
         load.setLFile(loLoad.getInputFile());
         load.setPc(pc);
         load.setResultType(loLoad.getType());

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java Thu Sep  4 14:25:41 2008
@@ -26,5 +26,9 @@
 
     public static final byte STATUS_EOP = 3; // end of processing
 
+    // This is currently only used in communications 
+    // between ExecutableManager and POStream
+    public static final byte STATUS_EOS = 4; // end of Streaming output (i.e. output from streaming binary)
+
     public static Object result;
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Sep  4 14:25:41 2008
@@ -30,6 +30,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -75,6 +76,9 @@
     // The data type for the results of this operator
     protected byte resultType = DataType.TUPLE;
 
+    // The physical plan this operator is part of
+    protected PhysicalPlan parentPlan;
+    
     // Specifies if the input has been directly attached
     protected boolean inputAttached = false;
 
@@ -294,4 +298,12 @@
     protected void cloneHelper(PhysicalOperator op) {
         resultType = op.resultType;
     }
+
+    /**
+     * @param physicalPlan
+     */
+    public void setParentPlan(PhysicalPlan physicalPlan) {
+       parentPlan = physicalPlan;
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java Thu Sep  4 14:25:41 2008
@@ -25,6 +25,13 @@
      */
     private static final long serialVersionUID = 1L;
 
+    /*
+     * When returnStatus is set to POStatus.STATUS_ERR
+     * Operators can choose to use the result field
+     * to put a meaning error message which will be 
+     * printed out in the final message shown to the user
+     */
+    
     public byte returnStatus;
 
     public Object result;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Sep  4 14:25:41 2008
@@ -210,4 +210,13 @@
         //do nothing
     }
 
+    /**
+     * @param stream
+     * @throws VisitorException 
+     */
+    public void visitStream(POStream stream) throws VisitorException {
+        // TODO Auto-generated method stub
+        
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Thu Sep  4 14:25:41 2008
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -44,6 +45,12 @@
      * 
      */
     private static final long serialVersionUID = 1L;
+    
+    // marker to indicate whether all input for this plan
+    // has been sent - this is currently only used in POStream
+    // to know if all map() calls and reduce() calls are finished
+    // and that there is no more input expected.
+    public boolean endOfAllInput = false;
 
     public PhysicalPlan() {
         super();
@@ -78,6 +85,7 @@
     @Override
     public void connect(PhysicalOperator from, PhysicalOperator to)
             throws PlanException {
+        
         super.connect(from, to);
         to.setInputs(getPredecessors(to));
     }
@@ -95,12 +103,32 @@
         List<PhysicalOperator> sucs = getSuccessors(op);
         if(sucs!=null && sucs.size()!=0){
             for (PhysicalOperator suc : sucs) {
-                suc.setInputs(null);
+                // successor could have multiple inputs
+                // for example = POUnion - remove op from
+                // its list of inputs - if after removal
+                // there are no other inputs, set successor's
+                // inputs to null
+                List<PhysicalOperator> succInputs = suc.getInputs();
+                succInputs.remove(op);
+                if(succInputs.size() == 0)
+                    suc.setInputs(null);
+                else
+                    suc.setInputs(succInputs);
             }
         }
         super.remove(op);
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.OperatorPlan#add(org.apache.pig.impl.plan.Operator)
+    @Override
+    public void add(PhysicalOperator op) {
+        // attach this plan as the plan the operator is part of
+        //op.setParentPlan(this);
+        super.add(op);
+    }
+*/
+
     public boolean isEmpty() {
         return (mOps.size() == 0);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Thu Sep  4 14:25:41 2008
@@ -60,20 +60,23 @@
     PigContext pc;
     //Indicates whether the loader setup is done or not
     boolean setUpDone = false;
+    // Indicates whether the filespec is splittable
+    boolean splittable = true;
     
     private final Log log = LogFactory.getLog(getClass());
     
-    public POLoad(OperatorKey k) {
-        this(k,-1, null);
+    public POLoad(OperatorKey k, boolean splittable) {
+        this(k,-1, null, splittable);
     }
 
     
-    public POLoad(OperatorKey k, FileSpec lFile){
-        this(k,-1,lFile);
+    public POLoad(OperatorKey k, FileSpec lFile, boolean splittable){
+        this(k,-1,lFile, splittable);
     }
     
-    public POLoad(OperatorKey k, int rp, FileSpec lFile) {
+    public POLoad(OperatorKey k, int rp, FileSpec lFile,boolean splittable) {
         super(k, rp);
+        this.splittable = splittable;
     }
     
     /**
@@ -182,4 +185,12 @@
         this.pc = pc;
     }
 
+
+    /**
+     * @return the splittable
+     */
+    public boolean isSplittable() {
+        return splittable;
+    }
+
 }

Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+
+public class POStream extends PhysicalOperator {
+    private static final long serialVersionUID = 2L;
+
+    private String executableManagerStr;            // String representing ExecutableManager to use
+    private ExecutableManager executableManager;    // ExecutableManager to use 
+    private StreamingCommand command;               // Actual command to be run
+    private StreamingCommand originalCommand;       // Original command
+    private StreamingCommand optimizedCommand;      // Optimized command
+    private Properties properties;
+
+    private boolean initialized = false;
+    
+    private BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
+
+    private BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
+
+    private boolean allInputFromPredecessorConsumed = false;
+
+    private boolean allOutputFromBinaryProcessed = false;
+
+    public POStream(OperatorKey k, ExecutableManager executableManager, 
+                      StreamingCommand command, Properties properties) {
+        super(k);
+        this.executableManagerStr = executableManager.getClass().getName();
+        this.originalCommand = command;
+        this.command = command;
+        this.properties = properties;
+
+        // Setup streaming-specific properties
+        if (command.getShipFiles()) {
+            parseShipCacheSpecs(command.getShipSpecs(), 
+                                properties, "pig.streaming.ship.files");
+        }
+        parseShipCacheSpecs(command.getCacheSpecs(), 
+                            properties, "pig.streaming.cache.files");
+    }
+    
+    private static void parseShipCacheSpecs(List<String> specs, 
+            Properties properties, String property) {
+        
+        String existingValue = properties.getProperty(property, "");
+        if (specs == null || specs.size() == 0) {
+            return;
+        }
+        
+        // Setup streaming-specific properties
+        StringBuffer sb = new StringBuffer();
+        Iterator<String> i = specs.iterator();
+        // first append any existing value
+        if(!existingValue.equals("")) {
+            sb.append(existingValue);
+            if (i.hasNext()) {
+                sb.append(", ");
+            }
+        }
+        while (i.hasNext()) {
+            sb.append(i.next());
+            if (i.hasNext()) {
+                sb.append(", ");
+            }
+        }
+        properties.setProperty(property, sb.toString());        
+    }
+
+    public Properties getShipCacheProperties() {
+        return properties;
+    }
+    
+    /**
+     * Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
+     * @return the {@link StreamingCommand} for this <code>StreamSpec</code>
+     */
+    public StreamingCommand getCommand() {
+        return command;
+    }
+    
+    /**
+     * Set the optimized {@link HandleSpec} for the given {@link Handle} of the 
+     * <code>StreamSpec</code>.
+     * 
+     * @param handle <code>Handle</code> to optimize
+     * @param spec optimized specification for the handle
+     */ 
+    public void setOptimizedSpec(Handle handle, String spec) {
+        if (optimizedCommand == null) {
+            optimizedCommand = (StreamingCommand)command.clone();
+        }
+        
+        if (handle == Handle.INPUT) {
+            HandleSpec streamInputSpec = optimizedCommand.getInputSpec();
+            streamInputSpec.setSpec(spec);
+        } else if (handle == Handle.OUTPUT) {
+            HandleSpec streamOutputSpec = optimizedCommand.getOutputSpec();
+            streamOutputSpec.setSpec(spec);
+        }
+        
+        command = optimizedCommand;
+    }
+    
+    /**
+     * Revert the optimized {@link StreamingCommand} for this 
+     * <code>StreamSpec</code>.
+     */
+    public void revertOptimizedCommand(Handle handle) {
+        if (optimizedCommand == null) {
+            return;
+        }
+
+        if (handle == Handle.INPUT &&
+            !command.getInputSpec().equals(originalCommand.getInputSpec())) {
+            command.setInputSpec(originalCommand.getInputSpec());
+        } else if (handle == Handle.OUTPUT && 
+                   !command.getOutputSpec().equals(
+                           originalCommand.getOutputSpec())) {
+            command.setOutputSpec(originalCommand.getOutputSpec());
+        }
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple)
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        // The POStream Operator works with ExecutableManager to
+        // send input to the streaming binary and to get output
+        // from it. To achieve a tuple oriented behavior, two queues
+        // are used - one for output from the binary and one for 
+        // input to the binary. In each getNext() call:
+        // 1) If there is no more output expected from the binary, an EOP is
+        // sent to successor
+        // 2) If there is any output from the binary in the queue, it is passed
+        // down to the successor
+        // 3) if neither of these two are true and if it is possible to
+        // send input to the binary, then the next tuple from the
+        // predecessor is got and passed to the binary
+        try {
+            // if we are being called AFTER all output from the streaming 
+            // binary has already been sent to us then just return EOP
+            // The "allOutputFromBinaryProcessed" flag is set when we see
+            // an EOS (End of Stream output) from streaming binary
+            if(allOutputFromBinaryProcessed) {
+                return new Result(POStatus.STATUS_EOP, null);
+            }
+            
+            // if we are here AFTER all map() calls have been completed
+            // AND AFTER we process all possible input to be sent to the
+            // streaming binary, then all we want to do is read output from
+            // the streaming binary
+            if(allInputFromPredecessorConsumed) {
+                Result r = binaryOutputQueue.take();
+                if(r.returnStatus == POStatus.STATUS_EOS) {
+                    // If we received EOS, it means all output
+                    // from the streaming binary has been sent to us
+                    // So we can send an EOP to the successor in
+                    // the pipeline. Also since we are being called
+                    // after all input from predecessor has been processed
+                    // it means we got here from a call from close() in
+                    // map or reduce. So once we send this EOP down, 
+                    // getNext() in POStream should never be called. So
+                    // we don't need to set any flag noting we saw all output
+                    // from binary
+                    r.returnStatus = POStatus.STATUS_EOP;
+                }
+                return(r);
+            }
+            
+            // if we are here, we haven't consumed all input to be sent
+            // to the streaming binary - check if we are being called
+            // from close() on the map or reduce
+            if(this.parentPlan.endOfAllInput) {
+                Result r = getNextHelper(t);
+                if(r.returnStatus == POStatus.STATUS_EOP) {
+                    // we have now seen *ALL* possible input
+                    // check if we ever had any real input
+                    // in the course of the map/reduce - if we did
+                    // then "initialized" will be true. If not, just
+                    // send EOP down.
+                    if(initialized) {
+                        // signal End of ALL input to the Executable Manager's 
+                        // Input handler thread
+                        binaryInputQueue.put(r);
+                        // note this state for future calls
+                        allInputFromPredecessorConsumed  = true;
+                        // look for output from binary
+                        r = binaryOutputQueue.take();
+                        if(r.returnStatus == POStatus.STATUS_EOS) {
+                            // If we received EOS, it means all output
+                            // from the streaming binary has been sent to us
+                            // So we can send an EOP to the successor in
+                            // the pipeline. Also since we are being called
+                            // after all input from predecessor has been processed
+                            // it means we got here from a call from close() in
+                            // map or reduce. So once we send this EOP down, 
+                            // getNext() in POStream should never be called. So
+                            // we don't need to set any flag noting we saw all output
+                            // from binary
+                            r.returnStatus = POStatus.STATUS_EOP;
+                        }
+                    }
+                    
+                } else if(r.returnStatus == POStatus.STATUS_EOS) {
+                    // If we received EOS, it means all output
+                    // from the streaming binary has been sent to us
+                    // So we can send an EOP to the successor in
+                    // the pipeline. Also we are being called
+                    // from close() in map or reduce (this is so because
+                    // only then this.parentPlan.endOfAllInput is true).
+                    // So once we send this EOP down, getNext() in POStream
+                    // should never be called. So we don't need to set any 
+                    // flag noting we saw all output from binary
+                    r.returnStatus = POStatus.STATUS_EOP;
+                }
+                return r;
+            } else {
+                // we are not being called from close() - so
+                // we must be called from either map() or reduce()
+                // get the next Result from helper
+                Result r = getNextHelper(t);
+                if(r.returnStatus == POStatus.STATUS_EOS) {
+                    // If we received EOS, it means all output
+                    // from the streaming binary has been sent to us
+                    // So we can send an EOP to the successor in
+                    // the pipeline and also note this condition
+                    // for future calls
+                    r.returnStatus = POStatus.STATUS_EOP;
+                    allOutputFromBinaryProcessed  = true;
+                }
+                return r;
+            }
+            
+        } catch(Exception e) {
+            throw new ExecException("Error while trying to get next result in POStream", e);
+        }
+            
+        
+    }
+
+    public Result getNextHelper(Tuple t) throws ExecException {
+        try {
+            synchronized(this) {
+                while(true) {
+                    // if there is something in binary output Queue
+                    // return it
+                    if(!binaryOutputQueue.isEmpty()) {
+                        Result res = binaryOutputQueue.take();
+                        return res;
+                    }
+                    
+                    // check if we can write tuples to 
+                    // input of the process
+                    if(binaryInputQueue.remainingCapacity() > 0) {
+                        
+                        Result input = processInput();
+                        if(input.returnStatus == POStatus.STATUS_EOP || 
+                                input.returnStatus == POStatus.STATUS_ERR) {
+                            return input;
+                        } else {
+                            // we have a tuple to send as input
+                            // Only when we see the first tuple which can
+                            // be sent as input to the binary we want
+                            // to initialize the ExecutableManager and set
+                            // up the streaming binary - this is required in 
+                            // Unions due to a JOIN where there may never be
+                            // any input to send to the binary in one of the map
+                            // tasks - so we initialize only if we have to.
+                            // initialize the ExecutableManager once
+                            if(!initialized) {
+                                // set up the executableManager
+                                executableManager = 
+                                    (ExecutableManager)PigContext.instantiateFuncFromSpec(executableManagerStr);
+                                
+                                try {
+                                    executableManager.configure(this);
+                                    executableManager.run();
+                                } catch (IOException e) {
+                                    throw new ExecException("Error while running streaming binary", e);
+                                }            
+                                initialized = true;
+                            }
+                            
+                            // send this input to the streaming
+                            // process
+                            binaryInputQueue.put(input);
+                        }
+                        
+                    } else {
+                        
+                        // wait for either input to be available
+                        // or output to be consumed
+                        while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty())
+                            wait();
+                        
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new ExecException("Error while trying to get next result in POStream", e);
+        }
+    }
+    
+    public String toString() {
+        return "POStream" + "[" + command.toString() + "]" + " - " + mKey.toString();
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#visit(org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor)
+     */
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitStream(this);
+        
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#name()
+     */
+    @Override
+    public String name() {
+       return toString(); 
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#supportsMultipleInputs()
+     */
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#supportsMultipleOutputs()
+     */
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    /**
+     * 
+     */
+    public void finish() throws IOException {
+        executableManager.close();
+    }
+
+    /**
+     * @return the Queue which has input to binary
+     */
+    public BlockingQueue<Result> getBinaryInputQueue() {
+        return binaryInputQueue;
+    }
+
+    /**
+     * @return the Queue which has output from binary
+     */
+    public BlockingQueue<Result> getBinaryOutputQueue() {
+        return binaryOutputQueue;
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java Thu Sep  4 14:25:41 2008
@@ -137,17 +137,13 @@
                 while(true){
                     if(reporter!=null) reporter.progress();
                     res = inputs.get(ind).getNext(t);
-                    if(res.returnStatus == POStatus.STATUS_NULL)
-                        continue;
-
                     lastInd = ind + 1;
 
-                    if(res.returnStatus == POStatus.STATUS_ERR)
-                        return new Result();
-
-                    if (res.returnStatus == POStatus.STATUS_OK)
+                    if(res.returnStatus == POStatus.STATUS_OK || 
+                            res.returnStatus == POStatus.STATUS_NULL || res.returnStatus == POStatus.STATUS_ERR) {
                         return res;
-                    
+                    }
+
                     if (res.returnStatus == POStatus.STATUS_EOP) {
                         done.set(ind);
                         break;

Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link HadoopExecutableManager} is a specialization of 
+ * {@link ExecutableManager} and provides HDFS-specific support for secondary
+ * outputs, task-logs etc.
+ * 
+ * <code>HadoopExecutableManager</code> provides support for  secondary outputs
+ * of the managed process and also persists the logs of the tasks on HDFS. 
+ */
+public class HadoopExecutableManager extends ExecutableManager {
+    // The part-<partition> file name, similar to Hadoop's outputs
+    private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+    static {
+      NUMBER_FORMAT.setMinimumIntegerDigits(5);
+      NUMBER_FORMAT.setGroupingUsed(false);
+    }
+
+    static String getOutputName(int partition) {
+      return "part-" + NUMBER_FORMAT.format(partition);
+    }
+
+    JobConf job;
+    
+    String scriptOutputDir;
+    String scriptLogDir;
+    String taskId;
+    
+    FSDataOutputStream errorStream;
+    
+    boolean writeHeaderFooter = false;
+    
+    public HadoopExecutableManager() {}
+    
+    public void configure(POStream stream) 
+    throws IOException, ExecException {
+        super.configure(stream);
+        
+        // Chmod +x the executable
+        File executable = new File(command.getExecutable());
+        if (executable.isAbsolute()) {
+            // we don't own it. Hope it is executable ...
+        } else {
+            try {
+                FileUtil.chmod(executable.toString(), "a+x");
+            } catch (InterruptedException ie) {
+                throw new ExecException(ie);
+            }
+        }
+        
+        // Save a copy of the JobConf
+        job = PigMapReduce.sJobConf;
+        
+        // Save the output directory for the Pig Script
+        scriptOutputDir = job.get("pig.streaming.task.output.dir");
+        scriptLogDir = job.get("pig.streaming.log.dir", "_logs");
+        
+        // Save the taskid
+        taskId = job.get("mapred.task.id");
+    }
+    
+    protected void exec() throws IOException {
+        // Create the HDFS file for the stderr of the task, if necessary
+        if (writeErrorToHDFS(command.getLogFilesLimit(), taskId)) {
+            try {
+                Path errorFile = 
+                    new Path(new Path(scriptLogDir, command.getLogDir()), taskId);
+                errorStream = 
+                    errorFile.getFileSystem(job).create(errorFile);
+            } catch (IOException ie) {
+                // Don't fail the task if we couldn't save it's stderr on HDFS
+                System.err.println("Failed to create stderr file of task: " +
+                                   taskId + " in HDFS at " + scriptLogDir + 
+                                   " with " + ie);
+                errorStream = null;
+            }
+        }
+        
+        // Header for stderr file of the task
+        writeDebugHeader();
+
+        // Exec the command ...
+        super.exec();
+    }
+
+    public void close() throws IOException {
+        try {
+            super.close();
+
+            // Copy the secondary outputs of the task to HDFS
+            Path scriptOutputDir = new Path(this.scriptOutputDir);
+            FileSystem fs = scriptOutputDir.getFileSystem(job);
+            List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+            if (outputSpecs != null) {
+                for (int i=1; i < outputSpecs.size(); ++i) {
+                    String fileName = outputSpecs.get(i).getName();
+                    try {
+                        int partition = job.getInt("mapred.task.partition", -1);
+                        fs.copyFromLocalFile(false, true, new Path(fileName), 
+                                             new Path(
+                                                     new Path(scriptOutputDir, 
+                                                              fileName), 
+                                                     getOutputName(partition))
+                                            );
+                    } catch (IOException ioe) {
+                        System.err.println("Failed to save secondary output '" + 
+                                           fileName + "' of task: " + taskId +
+                                           " with " + ioe);
+                        throw ioe;
+                    }
+                }
+        }
+        } finally {
+            // Footer for stderr file of the task
+            writeDebugFooter();
+            
+            // Close the stderr file on HDFS
+            if (errorStream != null) {
+                errorStream.close();
+            }
+        }
+    }
+
+    /**
+     * Should the stderr data of this task be persisted on HDFS?
+     * 
+     * @param limit maximum number of tasks whose stderr log-files are persisted
+     * @param taskId id of the task
+     * @return <code>true</code> if stderr data of task should be persisted on 
+     *         HDFS, <code>false</code> otherwise
+     */
+    private boolean writeErrorToHDFS(int limit, String taskId) {
+        if (command.getPersistStderr()) {
+            // These are hard-coded begin/end offsets a Hadoop *taskid*
+            int beginIndex = 25, endIndex = 31;   
+
+            int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+            return tipId < command.getLogFilesLimit();
+        }
+        return false;
+    }
+    
+    protected void processError(String error) {
+        super.processError(error);
+        
+        try {
+            if (errorStream != null) {
+                errorStream.writeBytes(error);
+            }
+        } catch (IOException ioe) {
+            super.processError("Failed to save error logs to HDFS with: " + 
+                               ioe);
+        }
+    }
+
+    private void writeDebugHeader() {
+        processError("===== Task Information Header =====" );
+
+        processError("\nCommand: " + command);
+        processError("\nStart time: " + new Date(System.currentTimeMillis()));
+        if (job.getBoolean("mapred.task.is.map", false)) {
+            processError("\nInput-split file: " + job.get("map.input.file"));
+            processError("\nInput-split start-offset: " + 
+                         job.getLong("map.input.start", -1));
+            processError("\nInput-split length: " + 
+                         job.getLong("map.input.length", -1));
+        }
+        processError("\n=====          * * *          =====\n");
+    }
+    
+    private void writeDebugFooter() {
+        processError("===== Task Information Footer =====");
+
+        processError("\nEnd time: " + new Date(System.currentTimeMillis()));
+        processError("\nExit code: " + exitCode);
+
+        List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
+        HandleSpec inputSpec = 
+            (inputSpecs != null) ? inputSpecs.get(0) : null;
+        if (inputSpec == null || 
+            !inputSpec.getSpec().contains("BinaryStorage")) {
+            processError("\nInput records: " + inputRecords);
+        }
+        processError("\nInput bytes: " + inputBytes + " bytes " +
+                    ((inputSpec != null) ? 
+                            "(" + inputSpec.getName() + " using " + 
+                                inputSpec.getSpec() + ")" 
+                            : ""));
+
+        List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+        HandleSpec outputSpec = 
+            (outputSpecs != null) ? outputSpecs.get(0) : null;
+        if (outputSpec == null || 
+            !outputSpec.getSpec().contains("BinaryStorage")) {
+            processError("\nOutput records: " + outputRecords);
+        }
+        processError("\nOutput bytes: " + outputBytes + " bytes " +
+                     ((outputSpec != null) ? 
+                         "(" + outputSpec.getName() + " using " + 
+                             outputSpec.getSpec() + ")" 
+                         : ""));
+        if (outputSpecs != null) {
+            for (int i=1; i < outputSpecs.size(); ++i) {
+                HandleSpec spec = outputSpecs.get(i);
+                processError("\n           " + new File(spec.getName()).length() 
+                             + " bytes using " + spec.getSpec());
+            }
+        }
+
+        processError("\n=====          * * *          =====\n");
+    }
+}
+
+    
\ No newline at end of file

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Thu Sep  4 14:25:41 2008
@@ -53,8 +53,11 @@
 //import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
 //import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
 import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
-import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
+import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.WrappedIOException;
 
@@ -102,10 +105,19 @@
      */
     private Map<String, FuncSpec> definedFunctions = new HashMap<String, FuncSpec>();
     
+    /**
+     * a table mapping names to streaming commands.
+     */
+    private Map<String, StreamingCommand> definedCommands = 
+        new HashMap<String, StreamingCommand>();
+
     private static ArrayList<String> packageImportList = new ArrayList<String>();
 
     public boolean debug = true;
     
+    // List of paths skipped for automatic shipping
+    List<String> skippedShipPaths = new ArrayList<String>();
+    
     public PigContext() {
         this(ExecType.MAPREDUCE, new Properties());
     }
@@ -295,6 +307,22 @@
     }
 
     /**
+     * Defines an alias for the given streaming command.
+     * 
+     * This is useful for complicated streaming command specs.
+     * 
+     * @param alias - the new command alias to define.
+     * @param command - the command 
+     */
+    public void registerStreamCmd(String alias, StreamingCommand command) {
+        if (command == null) {
+            definedCommands.remove(alias);
+        } else {
+            definedCommands.put(alias, command);
+        }
+    }
+
+    /**
      * Returns the type of execution currently in effect.
      * 
      * @return current execution type
@@ -435,9 +463,48 @@
             return instantiateFuncFromSpec(alias);
     }
 
+    /**
+     * Get the {@link StreamingCommand} for the given alias.
+     * 
+     * @param alias the alias for the <code>StreamingCommand</code>
+     * @return <code>StreamingCommand</code> for the alias
+     */
+    public StreamingCommand getCommandForAlias(String alias) {
+        return definedCommands.get(alias);
+    }
+    
     public void setExecType(ExecType execType) {
         this.execType = execType;
     }
+    
+    /**
+     * Create a new {@link ExecutableManager} depending on the ExecType.
+     * 
+     * @return a new {@link ExecutableManager} depending on the ExecType
+     * @throws ExecException
+     */
+    public ExecutableManager createExecutableManager() throws ExecException {
+        ExecutableManager executableManager = null;
+
+        switch (execType) {
+            case LOCAL:
+            {
+                executableManager = new ExecutableManager();
+            }
+            break;
+            case MAPREDUCE: 
+            {
+                executableManager = new HadoopExecutableManager();
+            }
+            break;
+            default:
+            {
+                throw new ExecException("Unkown execType: " + execType);
+            }
+        }
+        
+        return executableManager;
+    }
 
     public FuncSpec getFuncSpecFromAlias(String alias) {
         FuncSpec funcSpec;
@@ -446,4 +513,25 @@
         else
             return null;
     }
+
+    /**
+     * Add a path to be skipped while automatically shipping binaries for 
+     * streaming.
+     *  
+     * @param path path to be skipped
+     */
+    public void addPathToSkip(String path) {
+        skippedShipPaths.add(path);
+    }
+    
+    /**
+     * Get paths which are to skipped while automatically shipping binaries for
+     * streaming.
+     * 
+     * @return paths which are to skipped while automatically shipping binaries 
+     *         for streaming
+     */
+    public List<String> getPathsToSkip() {
+        return skippedShipPaths;
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java Thu Sep  4 14:25:41 2008
@@ -160,4 +160,9 @@
         sb.append(carray, 0, cbuff.position());
         return sb.toString();
     }
+
+    public void close() throws IOException {
+        super.close();
+        in.close();
+    }
 }