You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/04 23:25:43 UTC

svn commit: r692253 [2/3] - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/ex...

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Thu Sep  4 14:25:41 2008
@@ -414,6 +414,26 @@
         return elem.exists() || globMatchesFiles(elem, store);
     }
 
+    public static boolean isFile(String filename, PigContext context)
+    throws IOException {
+        return !isDirectory(filename, context.getDfs());
+    }
+
+    public static boolean isFile(String filename, DataStorage store)
+    throws IOException {
+        return !isDirectory(filename, store);
+    }
+
+    public static boolean isDirectory(String filename, PigContext context)
+    throws IOException {
+        return isDirectory(filename, context.getDfs());
+    }
+
+    public static boolean isDirectory(String filename, DataStorage store)
+    throws IOException {
+        ElementDescriptor elem = store.asElement(filename);
+        return (elem instanceof ContainerDescriptor);
+    }
 
     private static boolean globMatchesFiles(ElementDescriptor elem,
                                             DataStorage fs)

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Thu Sep  4 14:25:41 2008
@@ -32,6 +32,7 @@
 
 public class LOLoad extends LogicalOperator {
     private static final long serialVersionUID = 2L;
+    protected boolean splittable = true;
 
     private FileSpec mInputFileSpec;
     private LoadFunc mLoadFunc;
@@ -51,11 +52,12 @@
      * 
      */
     public LOLoad(LogicalPlan plan, OperatorKey key, FileSpec inputFileSpec,
-            URL schemaFile) throws IOException {
+            URL schemaFile, boolean splittable) throws IOException {
         super(plan, key);
 
         mInputFileSpec = inputFileSpec;
         mSchemaFile = schemaFile;
+        this.splittable = splittable;
 
          try { 
              mLoadFunc = (LoadFunc)
@@ -139,6 +141,10 @@
         this.mEnforcedSchema = enforcedSchema;
     }
 
+    public boolean isSplittable() {
+        return splittable;
+    }
+
     @Override
     public byte getType() {
         return DataType.BAG ;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Thu Sep  4 14:25:41 2008
@@ -173,6 +173,10 @@
     protected void visit(LOLimit limOp) throws VisitorException {
         return;
     }
+    
+    protected void visit(LOStream stream) throws VisitorException {
+        return;
+    }
 
     /**
      * 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Sep  4 14:25:41 2008
@@ -47,6 +47,8 @@
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.BagFactory;
@@ -139,7 +141,7 @@
 	        storePlan.add(store);
 	        storePlan.add(input);
 	        storePlan.connect(input, store);
-	        attachPlan(storePlan, input, readFrom);
+	        attachPlan(storePlan, input, readFrom, new HashMap<LogicalOperator, Boolean>());
         } catch (ParseException pe) {
             throw new FrontendException(pe.getMessage());
         }
@@ -316,6 +318,130 @@
 	 	return mapAliasOp.get(alias);
 	 }
 
+     // Check and set files to be automatically shipped for the given StreamingCommand
+     // Auto-shipping rules:
+     // 1. If the command begins with either perl or python assume that the 
+     //    binary is the first non-quoted string it encounters that does not 
+     //    start with dash - subject to restrictions in (2).
+     // 2. Otherwise, attempt to ship the first string from the command line as 
+     //    long as it does not come from /bin, /user/bin, /user/local/bin. 
+     //    It will determine that by scanning the path if an absolute path is 
+     //    provided or by executing "which". The paths can be made configurable 
+     //    via "set stream.skippath <paths>" option.
+     private static final String PERL = "perl";
+     private static final String PYTHON = "python";
+     private void checkAutoShipSpecs(StreamingCommand command, String[] argv) 
+     throws ParseException {
+     	// Candidate for auto-ship
+     	String arg0 = argv[0];
+     	
+     	// Check if command is perl or python ... if so use the first non-option
+     	// and non-quoted string as the candidate
+        if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) {
+            for (int i=1; i < argv.length; ++i) {
+            	if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) {
+            		checkAndShip(command, argv[i]);
+            		break;
+            	}
+            }
+        } else {
+        	// Ship the first argument if it can be ...
+        	checkAndShip(command, arg0);
+        }
+     }
+     
+     private void checkAndShip(StreamingCommand command, String arg) 
+     throws ParseException {
+     	// Don't auto-ship if it is an absolute path...
+     	if (arg.startsWith("/")) {
+     		return;
+     	}
+     	
+     	// $ which arg
+     	String argPath = which(arg);
+     	if (argPath != null && !inSkipPaths(argPath)) {
+     		try {
+     		    command.addPathToShip(argPath);
+     		} catch(IOException e) {
+                throw new ParseException(e.getMessage());
+            }
+     	}
+     	 
+     }
+
+     private boolean isQuotedString(String s) {
+     	return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\'');
+     }
+     
+     // Check if file is in the list paths to be skipped 
+     private boolean inSkipPaths(String file) {
+     	for (String skipPath : pigContext.getPathsToSkip()) {
+     		if (file.startsWith(skipPath)) {
+     			return true;
+     		}
+     	}
+        return false;
+     }
+
+
+     private static String which(String file) {
+        try {
+        	ProcessBuilder processBuilder = 
+        	    new ProcessBuilder(new String[] {"which", file});
+            Process process = processBuilder.start();
+    
+            BufferedReader stdout = 
+                new BufferedReader(new InputStreamReader(process.getInputStream()));
+            String fullPath = stdout.readLine();
+
+            return (process.waitFor() == 0) ? fullPath : null;
+        } catch (Exception e) {}
+        return null;
+     }
+               
+     private static final char SINGLE_QUOTE = '\'';
+     private static final char DOUBLE_QUOTE = '"';
+     private static String[] splitArgs(String command) throws ParseException {
+        List<String> argv = new ArrayList<String>();
+
+        int beginIndex = 0;
+        
+        while (beginIndex < command.length()) {
+            // Skip spaces
+            while (Character.isWhitespace(command.charAt(beginIndex))) {
+                ++beginIndex;
+            }
+            
+            char delim = ' ';
+            char charAtIndex = command.charAt(beginIndex);
+            if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+                delim = charAtIndex;
+            }
+            
+            int endIndex = command.indexOf(delim, beginIndex+1);
+            if (endIndex == -1) {
+                if (Character.isWhitespace(delim)) {
+                    // Reached end of command-line
+                    argv.add(command.substring(beginIndex));
+                    break;
+                } else {
+                    // Didn't find the ending quote/double-quote
+                    throw new ParseException("Illegal command: " + command);
+                }
+            }
+            
+            if (Character.isWhitespace(delim)) {
+                // Do not consume the space
+                argv.add(command.substring(beginIndex, endIndex));
+            } else {
+                argv.add(command.substring(beginIndex, endIndex+1));
+            }
+           
+            beginIndex = endIndex + 1;
+        }
+        
+        return argv.toArray(new String[argv.size()]);
+    }
 	 
 	 //BEGIN
 	 //I am maintaining state about the operators that should
@@ -371,8 +497,13 @@
         return aliases.get(op);
     }
 
-    public static void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan) throws ParseException {
+    public static void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan, Map<LogicalOperator, Boolean> rootProcessed) throws ParseException {
         log.trace("Entering attachPlan");
+        if((rootProcessed.get(root) != null) && (rootProcessed.get(root))) {
+            log.trace("Root has been processed");
+            log.trace("Exiting attachPlan");
+            return;
+        }
         lp.add(root);
         log.debug("Added operator " + root + " to the logical plan " + lp);
         if(null == rootPlan.getPredecessors(root)) {
@@ -380,7 +511,8 @@
             return;
         }
         for(LogicalOperator rootPred: rootPlan.getPredecessors(root)) {
-            attachPlan(lp, rootPred, rootPlan);
+            attachPlan(lp, rootPred, rootPlan, rootProcessed);
+            rootProcessed.put(rootPred, true);
             try {
                 lp.connect(rootPred, root);
                 log.debug("Connected operator " + rootPred + " to " + root + " in the logical plan " + lp);
@@ -558,8 +690,17 @@
 TOKEN : { <MAP : "map"> }
 TOKEN : { <IS : "is"> }
 TOKEN : { <NULL : "null"> }
+TOKEN : { <STREAM : "stream"> }
+TOKEN : { <THROUGH : "through"> }
 TOKEN : { <STORE : "store"> }
-TOKEN : { <LIMIT : "limit"> }
+TOKEN : { <SHIP: "ship"> }
+TOKEN : { <CACHE: "cache"> }
+TOKEN : { <INPUT: "input"> }
+TOKEN : { <OUTPUT: "output"> }
+TOKEN : { <ERROR: "stderr"> }
+TOKEN : { <STDIN: "stdin"> }
+TOKEN : { <STDOUT: "stdout"> }
+TOKEN : { <LIMIT: "limit"> }
 
 TOKEN:
 {
@@ -594,6 +735,7 @@
 )*
 "'"> }
 
+TOKEN : { <EXECCOMMAND : "`" (~["`"])* "`"> }
 // Pig has special variables starting with $
 TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
 
@@ -628,6 +770,7 @@
             roots.add(op);
         }
         
+        Map<LogicalOperator, Boolean> rootProcessed = new HashMap<LogicalOperator, Boolean>();
         for(LogicalOperator op: roots) {
             //At this point we have a logical plan for the pig statement
             //In order to construct the entire logical plan we need to traverse
@@ -638,7 +781,8 @@
 
             LogicalPlan rootPlan = aliases.get(op);
             if(null != rootPlan) {
-                attachPlan(lp, op, rootPlan);
+                attachPlan(lp, op, rootPlan, rootProcessed);
+                rootProcessed.put(op, true);
             }
         }
 		
@@ -791,6 +935,25 @@
 |   (<JOIN> op = JoinClause(lp))
 |	(<UNION> op = UnionClause(lp))
 |	(<FOREACH> op = ForEachClause(lp))
+|   (<STREAM> op = StreamClause(lp) 
+        [ <AS> 
+        (
+            LOOKAHEAD(2) "(" schema = TupleSchema() ")" 
+            {
+                SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); 
+                op.setSchema(schema); 
+                op.setCanonicalNames(); 
+                log.info("Stream as schema()"+ schema);
+            } 
+        | fs = AtomSchema() 
+            {
+                schema = new Schema(fs);
+                op.setSchema(schema);
+                log.info("Stream as atomschema()" + schema);
+            }
+        ) 
+        ]
+    )
 |   (<STORE> op = StoreClause(lp))
 	)
     [<PARALLEL> t2=<INTEGER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
@@ -800,12 +963,14 @@
 
 LogicalOperator LoadClause(LogicalPlan lp) : 
 {
-	Token t1, t2; 
+	Token t1, t2, t3; 
 	String filename; 
 	String funcName,funcArgs =null;
 	FuncSpec funcSpec = null;
 	String funcSpecAsString = null; 
 	LOLoad lo=null; 
+    String splitBy;
+    boolean splittable = true;
 	log.trace("Entering LoadClause");
 }
 {
@@ -818,13 +983,24 @@
 			log.debug("LoadClause: funcSpec = " + funcSpec);
 		}
 		)?
+		(
+		<SPLIT> <BY> t3 = <QUOTEDSTRING>
+		{
+			splitBy = unquote(t3.image);
+			if (splitBy.equalsIgnoreCase("file")) {
+				splittable = false;
+			}
+		}
+		)?
 	)
 	{
-		if (funcSpec == null){
-			funcSpec = new FuncSpec(PigStorage.class.getName());
+		if (funcSpecAsString == null){
+			funcSpecAsString = PigStorage.class.getName();
+			funcSpec = new FuncSpec(funcSpecAsString);
+			log.debug("LoadClause: funcSpec = " + funcSpec);
 		}
 
-		lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null);
+		lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null, splittable);
 		lp.add(lo);
 		log.debug("Added operator " + lo.getClass().getName() + " to the logical plan");	
 		
@@ -1488,11 +1664,13 @@
                 the list of foreach plans
                 */
 
+                Map<LogicalOperator, Boolean> rootProcessed = new HashMap<LogicalOperator, Boolean>();
                 for(LogicalOperator project: mapProjectInputs.keySet()) {
                     for(LogicalOperator projectInput: mapProjectInputs.get(project)) {
                         generatePlan.add(projectInput);
                         generatePlan.connect(projectInput, project);
-                        attachPlan(generatePlan, projectInput, foreachPlan);
+                        attachPlan(generatePlan, projectInput, foreachPlan, rootProcessed);
+                        rootProcessed.put(projectInput, true);
                     }
                 }
             }
@@ -1517,15 +1695,104 @@
 	}
 }
 
-LogicalOperator DefineClause(LogicalPlan lp) : {Token t; Token t1; String functionName, functionArgs;}
+LogicalOperator StreamClause(LogicalPlan lp): 
+{
+	LogicalOperator input; 
+	StreamingCommand command;
+}
+{
+	input = NestedExpr(lp)	
+	
+	<THROUGH> command = Command()
+	{
+		LOStream loStream = new LOStream(lp, new OperatorKey(scope, getNextId()), input, 
+                    pigContext.createExecutableManager(), command);
+        //addAlias(input.getAlias(), input);
+        lp.add(loStream);
+        lp.connect(input, loStream);
+        return loStream;
+	}
+}
+
+StreamingCommand Command(): {Token t; StreamingCommand command;}
+{
+	t = <EXECCOMMAND>
+	{
+		String[] argv = splitArgs(unquote(t.image));
+		command = new StreamingCommand(pigContext, argv);
+        checkAutoShipSpecs(command, argv);
+		return command;
+	}
+	|
+	t = <IDENTIFIER>
+	{
+		command = pigContext.getCommandForAlias(t.image);
+		if (command == null) {
+			throw new ParseException("Undefined command-alias: " + t.image + 
+			                         " used as stream operator");
+		}
+
+		return command;
+	}
+}
+
+LogicalOperator DefineClause(LogicalPlan lp) : {Token t; Token cmd; String functionName, functionArgs;}
 {
     t = <IDENTIFIER>
     (
+    ( 
+        cmd = <EXECCOMMAND>
+        {
+            StreamingCommand command = 
+               new StreamingCommand(pigContext, splitArgs(unquote(cmd.image)));
+            String[] paths;
+            StreamingCommand.HandleSpec[] handleSpecs;
+        }
+        (
+            <SHIP> "(" paths = PathList() ")" 
+            {
+                if (paths.length == 0) {
+                	command.setShipFiles(false);
+                } else {
+                    for (String path : paths) {
+                    	try {
+                            command.addPathToShip(path);
+                        } catch(IOException e) {
+                            throw new ParseException(e.getMessage());
+                        }
+                    }
+                }
+            }
+            |
+            <CACHE> "(" paths = PathList() ")"
+            {
+                for (String path : paths) {
+                    try {
+                        command.addPathToCache(path);
+                    } catch(IOException e) {
+                        throw new ParseException(e.getMessage());
+                    }
+                }
+            }
+            |
+            <INPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.INPUT) ")"
+            |
+            <OUTPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.OUTPUT) ")"
+            |
+            <ERROR> "(" ErrorSpec(command, t.image) ")"
+        )*
+        {
+            pigContext.registerStreamCmd(t.image, command); 
+        }
+    )
+    |
+    (
         functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
         {
             pigContext.registerFunction(t.image, new FuncSpec(functionName + "(" + functionArgs + ")"));
         }
     )
+    )
     {
         // Return the dummy LODefine
         LogicalOperator lo = new LODefine(lp, new OperatorKey(scope, getNextId()));
@@ -1534,6 +1801,83 @@
     }
 }
 
+String[] PathList() : {Token t; List<String> pathList = new ArrayList<String>();}
+{
+    (
+    (
+    t = <QUOTEDSTRING> {pathList.add(unquote(t.image));}
+    ( "," t = <QUOTEDSTRING> {pathList.add(unquote(t.image));} )*
+    )
+    | {}
+    )
+    {return pathList.toArray(new String[pathList.size()]);}
+}
+
+void InputOutputSpec(StreamingCommand command, StreamingCommand.Handle handle):
+{
+    String stream, deserializer;
+    StreamingCommand.HandleSpec[] handleSpecs;
+    String functionName = "PigStorage", functionArgs="";
+} 
+{
+    stream = CommandStream() 
+    [
+        <USING> functionName = QualifiedFunction() 
+        [
+            "(" functionArgs = StringList() ")"
+        ]
+    ]
+    {
+        deserializer = functionName + "(" + functionArgs + ")";
+        command.addHandleSpec(handle, 
+                              new HandleSpec(stream, deserializer)
+                             );
+    }
+    (
+        "," 
+        stream = CommandStream() 
+        [
+            <USING> functionName = QualifiedFunction() 
+            [
+                "(" functionArgs = StringList() ")"
+            ]
+        ] 
+        {
+            deserializer = functionName + "(" + functionArgs + ")";
+            command.addHandleSpec(handle, 
+                                  new HandleSpec(stream, deserializer)
+                                 );
+        }
+    )* 
+}
+
+String CommandStream(): {Token t;}
+{
+    t = <STDIN>
+    {return "stdin";}
+    |
+    t = <STDOUT>
+    {return "stdout";}
+    |
+    t = <QUOTEDSTRING>
+    {return unquote(t.image);}
+}
+
+void ErrorSpec(StreamingCommand command, String alias): {Token t1, t2; int limit = StreamingCommand.MAX_TASKS;}
+{
+	(
+	t1 = <QUOTEDSTRING>
+	(<LIMIT> t2 = <INTEGER> {limit = Integer.parseInt(t2.image);})?
+	{
+		command.setLogDir(unquote(t1.image));
+		command.setLogFilesLimit(limit);
+	}
+	)
+	|
+	{
+        command.setLogDir(alias);
+	}
+}
 LogicalOperator StoreClause(LogicalPlan lp) : {LogicalOperator lo; Token t; String fileName; String functionSpec = null; 
                                                 String functionName, functionArgs;}
 {

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link DefaultInputHandler} handles the input for the Pig-Streaming
+ * executable in a {@link InputType#SYNCHRONOUS} manner by feeding it input
+ * via its <code>stdin</code>.  
+ */
+public class DefaultInputHandler extends InputHandler {
+    
+    OutputStream stdin;
+    
+    public DefaultInputHandler() {
+        serializer = new PigStorage();
+    }
+    
+    public DefaultInputHandler(HandleSpec spec) {
+        serializer = (StoreFunc)PigContext.instantiateFuncFromSpec(spec.spec);
+    }
+    
+    public InputType getInputType() {
+        return InputType.SYNCHRONOUS;
+    }
+    
+    public void bindTo(OutputStream os) throws IOException {
+        stdin = os;
+        super.bindTo(stdin);
+    }
+    
+    @Override
+    public synchronized void close(Process process) throws IOException {
+        if(!alreadyClosed) {
+            alreadyClosed = true;
+            super.close(process);
+            try {
+                stdin.flush();
+                stdin.close();
+                stdin = null;
+            } catch(IOException e) {
+	            // check if we got an exception because
+                // the process actually completed and we were
+                // trying to flush and close it's stdin
+                if(process == null || process.exitValue() != 0) {
+                    // the process had not terminated normally 
+                    // throw the exception we got                    
+                    throw e;
+                }
+            }
+            
+        }
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileOutputHandler} handles the output from the Pig-Streaming
+ * executable in an {@link OutputType#SYNCHRONOUS} manner by reading its output
+ * via its <code>stdout</code>.
+ */
+public class DefaultOutputHandler extends OutputHandler {
+    BufferedPositionedInputStream stdout;
+    
+    public DefaultOutputHandler() {
+        deserializer = new PigStorage();
+    }
+    
+    public DefaultOutputHandler(HandleSpec spec) {
+        deserializer = (LoadFunc)PigContext.instantiateFuncFromSpec(spec.spec);
+    }
+
+    public OutputType getOutputType() {
+        return OutputType.SYNCHRONOUS;
+    }
+
+    public void bindTo(String fileName, BufferedPositionedInputStream is,
+            long offset, long end) throws IOException {
+        stdout = is;
+        super.bindTo(fileName, stdout, offset, end);
+    }
+
+    public synchronized void close() throws IOException {
+        if(!alreadyClosed) {
+            super.close();
+            stdout.close();
+            stdout = null;
+            alreadyClosed = true;
+        }
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.streaming.InputHandler.InputType;
+import org.apache.pig.impl.streaming.OutputHandler.OutputType;
+
+/**
+ * {@link ExecutableManager} manages an external executable which processes data
+ * in a Pig query.
+ * 
+ * The <code>ExecutableManager</code> is responsible for startup/teardown of
+ * the external process and also for managing it. It feeds input records to the
+ * executable via it's <code>stdin</code>, collects the output records from
+ * the <code>stdout</code> and also diagnostic information from the
+ * <code>stdout</code>.
+ */
+public class ExecutableManager {
+    private static final Log LOG = LogFactory.getLog(ExecutableManager.class
+            .getName());
+    private static final int SUCCESS = 0;
+    private static final String PATH = "PATH";
+    private static final String BASH = "bash";
+    private static final Result EOS_RESULT = new Result(POStatus.STATUS_EOS, null);
+    
+    protected StreamingCommand command; // Streaming command to be run
+    String argvAsString; // Parsed commands
+
+    Process process; // Handle to the process
+    protected int exitCode = -127; // Exit code of the process
+
+    protected DataOutputStream stdin; // stdin of the process
+    ProcessInputThread stdinThread; // thread to send input to process
+    
+    ProcessOutputThread stdoutThread; // thread to get process stdout
+    InputStream stdout; // stdout of the process
+    
+    ProcessErrorThread stderrThread; // thread to get process stderr
+    InputStream stderr; // stderr of the process
+
+    // Input/Output handlers
+    InputHandler inputHandler;
+    OutputHandler outputHandler;
+
+    // Statistics
+    protected long inputRecords = 0;
+    protected long inputBytes = 0;
+    protected long outputRecords = 0;
+    protected long outputBytes = 0;
+
+    protected volatile Throwable outerrThreadsError;
+    private POStream poStream;
+    private ProcessInputThread fileInputThread;
+    
+    /**
+     * Create a new {@link ExecutableManager}.
+     */
+    public ExecutableManager() {
+    }
+
+    /**
+     * Configure and initialize the {@link ExecutableManager}.
+     * 
+     * @param properties
+     *            {@link Properties} for the <code>ExecutableManager</code>
+     * @param command
+     *            {@link StreamingCommand} to be run by the
+     *            <code>ExecutableManager</code>
+     * @param endOfPipe
+     *            {@link DataCollector} to be used to push results of the
+     *            <code>StreamingCommand</code> down
+     * @throws IOException
+     * @throws ExecException
+     */
+    public void configure(POStream stream) throws IOException, ExecException {
+        this.poStream = stream;
+        this.command = stream.getCommand();
+        String[] argv = this.command.getCommandArgs();
+        argvAsString = "";
+        for (String arg : argv) {
+            argvAsString += arg;
+            argvAsString += " ";
+        }
+
+        // Create the input/output handlers
+        this.inputHandler = HandlerFactory.createInputHandler(command);
+        this.outputHandler = HandlerFactory.createOutputHandler(command);
+    }
+
+    /**
+     * Close and cleanup the {@link ExecutableManager}.
+     * @throws IOException 
+     */
+    public void close() throws IOException {
+        // Close the InputHandler, which in some cases lets the process
+        // terminate
+        inputHandler.close(process);
+
+        // Check if we need to start the process now ...
+        if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+            exec();
+        }
+
+        // Wait for the process to exit
+        try {
+            exitCode = process.waitFor();
+        } catch (InterruptedException ie) {
+            LOG.error("Unexpected exception while waiting for streaming binary to complete", ie);
+            killProcess(process);
+        }
+        
+        // Wait for stdout thread to complete
+        try {
+            if (stdoutThread != null) {
+                stdoutThread.join(0);
+            }
+            stdoutThread = null;
+        } catch (InterruptedException ie) {
+            LOG.error("Unexpected exception while waiting for output thread for streaming binary to complete", ie);
+            killProcess(process);
+        }
+        
+        // Wait for stderr thread to complete
+        try {
+            if (stderrThread != null) {
+                stderrThread.join(0);
+            }
+            stderrThread = null;
+        } catch (InterruptedException ie) {
+            LOG.error("Unexpected exception while waiting for input thread for streaming binary to complete", ie);
+            killProcess(process);
+        }
+
+        LOG.debug("Process exited with: " + exitCode);
+        if (exitCode != SUCCESS) {
+            LOG.error(command + " failed with exit status: "
+                    + exitCode);
+        }
+
+        if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
+
+            // Trigger the outputHandler
+            outputHandler.bindTo("", null, 0, -1);
+            
+            // start thread to process output from executable's stdout
+            stdoutThread = new ProcessOutputThread(outputHandler, poStream);
+            stdoutThread.start();
+        }
+        
+        // Check if there was a problem with the managed process
+        if (outerrThreadsError != null) {
+            LOG.error("Output/Error thread failed with: "
+                    + outerrThreadsError);
+        }
+
+    }
+
+    /**
+     *  Helper function to close input and output streams
+     *  to the process and kill it
+     * @param process the process to be killed
+     * @throws IOException 
+     */
+    private void killProcess(Process process) throws IOException {
+        inputHandler.close(process);
+        outputHandler.close();
+        process.destroy();
+    }
+
+    /**
+     * Convert path from Windows convention to Unix convention. Invoked under
+     * cygwin.
+     * 
+     * @param path
+     *            path in Windows convention
+     * @return path in Unix convention, null if fail
+     */
+    private String parseCygPath(String path) {
+        String[] command = new String[] { "cygpath", "-u", path };
+        Process p = null;
+        try {
+            p = Runtime.getRuntime().exec(command);
+        } catch (IOException e) {
+            return null;
+        }
+        int exitVal = 0;
+        try {
+            exitVal = p.waitFor();
+        } catch (InterruptedException e) {
+            return null;
+        }
+        if (exitVal != 0)
+            return null;
+        String line = null;
+        try {
+            InputStreamReader isr = new InputStreamReader(p.getInputStream());
+            BufferedReader br = new BufferedReader(isr);
+            line = br.readLine();
+        } catch (IOException e) {
+            return null;
+        }
+        return line;
+    }
+
+    /**
+     * Set up the run-time environment of the managed process.
+     * 
+     * @param pb
+     *            {@link ProcessBuilder} used to exec the process
+     */
+    protected void setupEnvironment(ProcessBuilder pb) {
+        String separator = ":";
+        Map<String, String> env = pb.environment();
+
+        // Add the current-working-directory to the $PATH
+        File dir = pb.directory();
+        String cwd = (dir != null) ? dir.getAbsolutePath() : System
+                .getProperty("user.dir");
+
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+            String unixCwd = parseCygPath(cwd);
+            if (unixCwd == null)
+                throw new RuntimeException(
+                        "Can not convert Windows path to Unix path under cygwin");
+            cwd = unixCwd;
+        }
+
+        String envPath = env.get(PATH);
+        if (envPath == null) {
+            envPath = cwd;
+        } else {
+            envPath = envPath + separator + cwd;
+        }
+        env.put(PATH, envPath);
+    }
+
+    /**
+     * Start execution of the external process.
+     * 
+     * This takes care of setting up the environment of the process and also
+     * starts {@link ProcessErrorThread} to process the <code>stderr</code> of
+     * the managed process.
+     * 
+     * @throws IOException
+     */
+    protected void exec() throws IOException {
+        // Set the actual command to run with 'bash -c exec ...'
+        List<String> cmdArgs = new ArrayList<String>();
+        cmdArgs.add(BASH);
+        cmdArgs.add("-c");
+        StringBuffer sb = new StringBuffer();
+        sb.append("exec ");
+        sb.append(argvAsString);
+        cmdArgs.add(sb.toString());
+
+        // Start the external process
+        ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs
+                .toArray(new String[cmdArgs.size()]));
+        setupEnvironment(processBuilder);
+        process = processBuilder.start();
+        LOG.debug("Started the process for command: " + command);
+
+        // Pick up the process' stderr stream and start the thread to
+        // process the stderr stream
+        stderr = new DataInputStream(new BufferedInputStream(process
+                .getErrorStream()));
+        stderrThread = new ProcessErrorThread();
+        stderrThread.start();
+
+        // Check if we need to handle the process' stdout directly
+        if (outputHandler.getOutputType() == OutputType.SYNCHRONOUS) {
+            // Get hold of the stdout of the process
+            stdout = new DataInputStream(new BufferedInputStream(process
+                    .getInputStream()));
+
+            // Bind the stdout to the OutputHandler
+            outputHandler.bindTo("", new BufferedPositionedInputStream(stdout),
+                    0, Long.MAX_VALUE);
+            
+            // start thread to process output from executable's stdout
+            stdoutThread = new ProcessOutputThread(outputHandler, poStream);
+            stdoutThread.start();
+        }
+    }
+
+    /**
+     * Start execution of the {@link ExecutableManager}.
+     * 
+     * @throws IOException
+     */
+    public void run() throws IOException {
+        // Check if we need to exec the process NOW ...
+        if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+            // start the thread to handle input
+            fileInputThread = new ProcessInputThread(inputHandler, poStream);
+            fileInputThread.start();
+            
+            // If Input type is ASYNCHRONOUS that means input to the
+            // streaming binary is from a file - that means we cannot exec
+            // the process till the input file is completely written. This
+            // will be done in close() - so now we return
+            return;
+        }
+
+        // Start the executable ...
+        exec();
+        // set up input to the executable
+        stdin = new DataOutputStream(new BufferedOutputStream(process
+                .getOutputStream()));
+        inputHandler.bindTo(stdin);
+
+        // Start the thread to send input to the executable's stdin
+        stdinThread = new ProcessInputThread(inputHandler, poStream);
+        stdinThread.start();
+    }
+
+    /**
+     * The thread which consumes input from POStream's binaryInput queue
+     * and feeds it to the the Process
+     */
+    class ProcessInputThread extends Thread {
+
+        InputHandler inputHandler;
+        private POStream poStream;
+        private BlockingQueue<Result> binaryInputQueue;
+
+        ProcessInputThread(InputHandler inputHandler, POStream poStream) {
+            setDaemon(true);
+            this.inputHandler = inputHandler;
+            this.poStream = poStream;
+            // the input queue from where this thread will read
+            // input tuples 
+            this.binaryInputQueue = poStream.getBinaryInputQueue();
+        }
+
+        public void run() {
+            try {
+                // Read tuples from the previous operator in the pipeline
+                // and pass it to the executable
+                while (true) {
+                    Result inp = null;
+                    inp = binaryInputQueue.take();
+                    synchronized (poStream) {
+                        // notify waiting producer
+                        // the if check is to keep "findbugs" 
+                        // happy 
+                        if(inp != null)
+                            poStream.notifyAll();
+                    }
+                    // We should receive an EOP only when *ALL* input
+                    // for this process has already been sent and no
+                    // more input is expected
+                    if (inp.returnStatus == POStatus.STATUS_EOP) {
+                        // signal cleanup in ExecutableManager
+                        close();
+                        return;
+                    }
+                    if (inp.returnStatus == POStatus.STATUS_OK) {
+                        // Check if there was a problem with the managed process
+                        if (outerrThreadsError != null) {
+                            throw new IOException(
+                                    "Output/Error thread failed with: "
+                                            + outerrThreadsError);
+                        }
+
+                        // Pass the serialized tuple to the executable via the
+                        // InputHandler
+                        Tuple t = null;
+                        try {
+                            t = (Tuple) inp.result;
+                            inputHandler.putNext(t);                            
+                        } catch (IOException e) {
+                            // if input type is synchronous then it could
+                            // be related to the process terminating
+                            if(inputHandler.getInputType() == InputType.SYNCHRONOUS) {
+                                LOG.warn("Exception while trying to write to stream binary's input", e);
+                                // could be because the process
+                                // died OR closed the input stream
+                                // we will only call close() here and not
+                                // worry about deducing whether the process died
+                                // normally or abnormally - if there was any real
+                                // issue the ProcessOutputThread should see
+                                // a non zero exit code from the process and send
+                                // a POStatus.STATUS_ERR back - what if we got
+                                // an IOException because there was only an issue with
+                                // writing to input of the binary - hmm..hope that means
+                                // the process died abnormally!!
+                                close();
+                                return;
+                            } else {
+                                // asynchronous case - then this is a real exception
+                                LOG.error("Exception while trying to write to stream binary's input", e);
+                                // send POStatus.STATUS_ERR to POStream to signal the error
+                                // Generally the ProcessOutputThread would do this but now
+                                // we should do it here since neither the process nor the
+                                // ProcessOutputThread will ever be spawned
+                                Result res = new Result(POStatus.STATUS_ERR, 
+                                        "Exception while trying to write to stream binary's input" + e.getMessage());
+                                sendOutput(poStream.getBinaryOutputQueue(), res);
+                                throw e;
+                            }
+                        }
+                        inputBytes += t.getMemorySize();
+                        inputRecords++;
+                    }
+                }
+            } catch (Throwable t) {
+                
+                
+                // Note that an error occurred
+                outerrThreadsError = t;
+                LOG.error(t);
+                try {
+                    killProcess(process);
+                } catch (IOException ioe) {
+                    LOG.warn(ioe);
+                }
+           }
+        }
+    }
+
+    private void sendOutput(BlockingQueue<Result> binaryOutputQueue, Result res) {
+        try {
+            binaryOutputQueue.put(res);
+        } catch (InterruptedException e) {
+            LOG.error("Error while sending binary output to POStream", e);
+        }
+        synchronized (poStream) {
+            // notify waiting consumer
+            // the if is to satisfy "findbugs"
+            if(res != null) {
+                poStream.notifyAll();
+            }
+        }
+    }
+    
+    /**
+     * The thread which gets output from the streaming binary and puts it onto
+     * the binary output Queue of POStream
+     */
+    class ProcessOutputThread extends Thread {
+
+        OutputHandler outputHandler;
+        private BlockingQueue<Result> binaryOutputQueue;
+
+        ProcessOutputThread(OutputHandler outputHandler, POStream poStream) {
+            setDaemon(true);
+            this.outputHandler = outputHandler;
+            // the output queue where this thread will put
+            // output tuples for POStream
+            this.binaryOutputQueue = poStream.getBinaryOutputQueue();
+        }
+
+        public void run() {
+            try {
+                // Read tuples from the executable and send it to
+                // Queue of POStream
+                Tuple tuple = null;
+                while ((tuple = outputHandler.getNext()) != null) {
+                    processOutput(tuple);
+                    outputBytes += tuple.getMemorySize();
+                }
+                // output from binary is done
+                processOutput(null);
+                outputHandler.close();
+            } catch (Throwable t) {
+                // Note that an error occurred
+                outerrThreadsError = t;
+                LOG.error("Caught Exception in OutputHandler of Streaming binary, " +
+                		"sending error signal to pipeline", t);
+                // send ERROR to POStream
+                try {
+                    Result res = new Result();
+                    res.result = "Error reading output from Streaming binary:" + 
+                    "'" + argvAsString + "':" + t.getMessage();
+                    res.returnStatus = POStatus.STATUS_ERR;
+                    sendOutput(binaryOutputQueue, res);
+                    killProcess(process);
+                } catch (Exception e) {
+                    LOG.error("Error while trying to signal Error status to pipeline", e);
+                }
+            }
+        }
+
+        void processOutput(Tuple t) {
+            Result res = new Result();
+            
+            if (t != null) {
+                // we have a valid tuple to pass back
+                res.result = t;
+                res.returnStatus = POStatus.STATUS_OK;
+                outputRecords++;
+            } else {
+                // t == null means end of output from
+                // binary - wait for the process to exit
+                // and harvest exit code
+                try {
+                    exitCode = process.waitFor();
+                } catch (InterruptedException ie) {
+                    try {
+                        killProcess(process);
+                    } catch (IOException e) {
+                        LOG.warn("Exception trying to kill process while processing null output " +
+                        		"from binary", e);
+                        
+                    }
+                    // signal error
+                    String errMsg = "Failure while waiting for process (" + argvAsString + ")" +
+                    ie.getMessage(); 
+                    LOG.error(errMsg, ie);
+                    res.result = errMsg;
+                    res.returnStatus = POStatus.STATUS_ERR;
+                    sendOutput(binaryOutputQueue, res);
+                    return;
+                }
+                if(exitCode == 0) {
+                    // signal EOS (End Of Stream output)
+                    res = EOS_RESULT;
+                } else {
+                    // signal Error
+                    
+                    String errMsg = "'" + argvAsString + "'" + " failed with exit status: " 
+                    + exitCode; 
+                    LOG.error(errMsg);
+                    res.result = errMsg;
+                    res.returnStatus = POStatus.STATUS_ERR;
+                }
+            }
+            sendOutput(binaryOutputQueue, res);
+            
+        }
+    }
+    
+    
+    
+    /**
+     * Workhorse to process the stderr stream of the managed process.
+     * 
+     * By default <code>ExecuatbleManager</code> just sends out the received
+     * error message to the <code>stderr</code> of itself.
+     * 
+     * @param error
+     *            error message from the managed process.
+     */
+    protected void processError(String error) {
+        // Just send it out to our stderr
+        System.err.print(error);
+    }
+
+    class ProcessErrorThread extends Thread {
+
+        public ProcessErrorThread() {
+            setDaemon(true);
+        }
+
+        public void run() {
+            try {
+                String error;
+                BufferedReader reader = new BufferedReader(
+                        new InputStreamReader(stderr));
+                while ((error = reader.readLine()) != null) {
+                    processError(error + "\n");
+                }
+
+                if (stderr != null) {
+                    stderr.close();
+                    LOG.debug("ProcessErrorThread done");
+                }
+            } catch (Throwable t) {
+                // Note that an error occurred
+                outerrThreadsError = t;
+
+                LOG.error(t);
+                try {
+                    if (stderr != null) {
+                        stderr.close();
+                    }
+                } catch (IOException ioe) {
+                    LOG.warn(ioe);
+                }
+                throw new RuntimeException(t);
+            }
+        }
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileInputHandler} handles the input for the Pig-Streaming
+ * executable in an {@link InputType#ASYNCHRONOUS} manner by feeding it input
+ * via an external file specified by the user.  
+ */
+public class FileInputHandler extends InputHandler {
+
+    String fileName;
+    OutputStream fileOutStream;
+    
+    public FileInputHandler(HandleSpec handleSpec) throws ExecException {
+        fileName = handleSpec.name;
+        serializer = 
+            (StoreFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec);
+        
+        try {
+            fileOutStream = new FileOutputStream(new File(fileName)); 
+            super.bindTo(fileOutStream);
+        } catch (IOException fnfe) {
+            throw new ExecException(fnfe);
+        }
+    }
+
+    public InputType getInputType() {
+        return InputType.ASYNCHRONOUS;
+    }
+    
+    public void bindTo(OutputStream os) throws IOException {
+        throw new UnsupportedOperationException("Cannot call bindTo on " +
+        		                                "FileInputHandler");
+    }
+    
+    public synchronized void close(Process process) throws IOException {
+        if(!alreadyClosed) {
+            super.close(process);
+            fileOutStream.flush();
+            fileOutStream.close();
+            fileOutStream = null;
+            alreadyClosed = true;
+        }
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileOutputHandler} handles the output from the Pig-Streaming
+ * executable in an {@link OutputType#ASYNCHRONOUS} manner by reading it from
+ * an external file specified by the user.  
+ */
+public class FileOutputHandler extends OutputHandler {
+
+    String fileName;
+    BufferedPositionedInputStream fileInStream;
+    
+    public FileOutputHandler(HandleSpec handleSpec) throws ExecException {
+        fileName = handleSpec.name;
+        deserializer = 
+            (LoadFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec);
+    }
+
+    public OutputType getOutputType() {
+        return OutputType.ASYNCHRONOUS;
+    }
+    
+    public void bindTo(String fileName, BufferedPositionedInputStream is,
+            long offset, long end) throws IOException {
+        // This is a trigger to start processing the output from the file ...
+        // ... however, we must ignore the input parameters and use ones
+        // provided during initialization
+        File file = new File(this.fileName);
+        this.fileInStream = 
+            new BufferedPositionedInputStream(new FileInputStream(file)); 
+        super.bindTo(this.fileName, this.fileInStream, 0, file.length());
+    }
+    
+    public synchronized void close() throws IOException {
+        if(!alreadyClosed) {
+            super.close();
+            fileInStream.close();
+            fileInStream = null;
+            alreadyClosed = true;
+        }
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * Factory to create an {@link InputHandler} or {@link OutputHandler}
+ * depending on the specification of the {@link StreamingCommand}.
+ */
+public class HandlerFactory {
+
+    /**
+     * Create an <code>InputHandler</code> for the given input specification
+     * of the <code>StreamingCommand</code>.
+     * 
+     * @param command <code>StreamingCommand</code>
+     * @return <code>InputHandler</code> for the given input specification
+     * @throws ExecException
+     */
+    public static InputHandler createInputHandler(StreamingCommand command) 
+    throws ExecException {
+        List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
+        
+        HandleSpec in = null;
+        if (inputSpecs == null || (in = inputSpecs.get(0)) == null) {
+            return new DefaultInputHandler();
+        }
+        
+        return (in.name.equals("stdin")) ? new DefaultInputHandler(in) :
+                                           new FileInputHandler(in);
+    }
+    
+    /**
+     * Create an <code>OutputHandler</code> for the given output specification
+     * of the <code>StreamingCommand</code>.
+     * 
+     * @param command <code>StreamingCommand</code>
+     * @return <code>OutputHandler</code> for the given output specification
+     * @throws ExecException
+     */
+    public static OutputHandler createOutputHandler(StreamingCommand command) 
+    throws ExecException {
+        List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+        
+        HandleSpec out = null;
+        if (outputSpecs == null || (out = outputSpecs.get(0)) == null) {
+            return new DefaultOutputHandler();
+        }
+        
+        return (out.name.equals("stdout")) ? new DefaultOutputHandler(out) :
+                                             new FileOutputHandler(out);
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * {@link InputHandler} is responsible for handling the input to the 
+ * Pig-Streaming external command.
+ * 
+ * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS} 
+ * manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS} 
+ * manner via an external file which is subsequently read by the executable.
+ */
+public abstract class InputHandler {
+    /**
+     * 
+     */
+    public enum InputType {SYNCHRONOUS, ASYNCHRONOUS}
+    /*
+     * The serializer to be used to send data to the managed process.
+     * 
+     * It is the responsibility of the concrete sub-classes to setup and
+     * manage the serializer. 
+     */  
+    protected StoreFunc serializer;
+    
+    // flag to mark if close() has already been called
+    protected boolean alreadyClosed = false;
+    
+    /**
+     * Get the handled <code>InputType</code>
+     * @return the handled <code>InputType</code>
+     */
+    public abstract InputType getInputType();
+    
+    /**
+     * Send the given input <code>Tuple</code> to the managed executable.
+     * 
+     * @param t input <code>Tuple</code>
+     * @throws IOException
+     */
+    public void putNext(Tuple t) throws IOException {
+        serializer.putNext(t);
+    }
+    
+    /**
+     * Close the <code>InputHandler</code> since there is no more input
+     * to be sent to the managed process.
+     * @param process the managed process - this could be null in some cases
+     * like when input is through files. In that case, the process would not
+     * have been exec'ed yet - if this method if overridden it is the responsibility
+     * of the implementer to check that the process is usable. The managed process
+     * object is supplied by the ExecutableManager to this call so that this method
+     * can check if the process is alive if it needs to know.
+     * 
+     * @throws IOException
+     */
+    public synchronized void close(Process process) throws IOException {
+        if(!alreadyClosed) {
+            serializer.finish();
+            alreadyClosed = true;
+        }
+    }
+    
+    /**
+     * Bind the <code>InputHandler</code> to the <code>OutputStream</code>
+     * from which it reads input and sends it to the managed process.
+     * 
+     * @param os <code>OutputStream</code> from which to read input data for the
+     *           managed process
+     * @throws IOException
+     */
+    public void bindTo(OutputStream os) throws IOException {
+        serializer.bindTo(os);
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+/**
+ * {@link OutputHandler} is responsible for handling the output of the 
+ * Pig-Streaming external command.
+ * 
+ * The output of the managed executable could be fetched in a 
+ * {@link OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an 
+ * {@link OutputType#ASYNCHRONOUS} manner via an external file to which the
+ * process wrote its output.
+ */
+public abstract class OutputHandler {
+    public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS}
+
+    /*
+     * The deserializer to be used to send data to the managed process.
+     * 
+     * It is the responsibility of the concrete sub-classes to setup and
+     * manage the deserializer. 
+     */  
+    protected LoadFunc deserializer;
+    
+    /**
+     * Get the handled <code>OutputType</code>.
+     * @return the handled <code>OutputType</code> 
+     */
+    public abstract OutputType getOutputType();
+    
+    // flag to mark if close() has already been called
+    protected boolean alreadyClosed = false;
+    
+    /**
+     * Bind the <code>OutputHandler</code> to the <code>InputStream</code>
+     * from which to read the output data of the managed process.
+     * 
+     * @param is <code>InputStream</code> from which to read the output data 
+     *           of the managed process
+     * @throws IOException
+     */
+    public void bindTo(String fileName, BufferedPositionedInputStream is,
+                       long offset, long end) throws IOException {
+        deserializer.bindTo(fileName, new BufferedPositionedInputStream(is), 
+                            offset, end);
+    }
+    
+    /**
+     * Get the next output <code>Tuple</code> of the managed process.
+     * 
+     * @return the next output <code>Tuple</code> of the managed process
+     * @throws IOException
+     */
+    public Tuple getNext() throws IOException {
+        return deserializer.getNext();
+    }
+    
+    /**
+     * Close the <code>OutputHandler</code>.
+     * @throws IOException
+     */
+    public synchronized void close() throws IOException {}
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu Sep  4 14:25:41 2008
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+
+
+/**
+ * {@link StreamingCommand} represents the specification of an external
+ * command to be executed in a Pig Query. 
+ * 
+ * <code>StreamingCommand</code> encapsulates all relevant details of the
+ * command specified by the user either directly via the <code>STREAM</code>
+ * operator or indirectly via a <code>DEFINE</code> operator. It includes
+ * details such as input/output/error specifications and also files to be
+ * shipped to the cluster and files to be cached.
+ */
+public class StreamingCommand implements Serializable, Cloneable {
+    private static final long serialVersionUID = 1L;
+
+    // External command to be executed and it's parsed components
+    String executable;                   
+    String[] argv;
+    
+    // Files to be shipped to the cluster in-order to be executed
+    List<String> shipSpec = new LinkedList<String>();                  
+
+    // Files to be shipped to the cluster in-order to be executed
+    List<String> cacheSpec = new LinkedList<String>();                 
+
+    /**
+     * Handle to communicate with the external process.
+     */
+    public enum Handle {INPUT, OUTPUT}
+    
+    /**
+     * Map from the the stdin/stdout/stderr handles to their specifications
+     */
+    Map<Handle, List<HandleSpec>> handleSpecs = 
+        new TreeMap<Handle, List<HandleSpec>>();
+    
+    // Should the stderr of the process be persisted?
+    boolean persistStderr = false;
+    
+    // Directory where the process's stderr logs should be persisted.
+    String logDir;
+    
+    // Limit on the number of persisted log-files 
+    int logFilesLimit = 100;
+    public static final int MAX_TASKS = 100;
+    
+    boolean shipFiles = true;
+    
+    private PigContext pigContext;
+
+    /**
+     * Create a new <code>StreamingCommand</code> with the given command.
+     * 
+     * @param command streaming command to be executed
+     * @param argv parsed arguments of the <code>command</code>
+     */
+    public StreamingCommand(PigContext pigContext, String[] argv) {
+        this.pigContext = pigContext;
+        this.argv = argv;
+
+        // Assume that argv[0] is the executable
+        this.executable = this.argv[0];
+    }
+    
+    /**
+     * Get the command to be executed.
+     * 
+     * @return the command to be executed
+     */
+    public String getExecutable() {
+        return executable;
+    }
+    
+    /**
+     * Set the executable for the <code>StreamingCommand</code>.
+     * 
+     * @param executable the executable for the <code>StreamingCommand</code>
+     */
+    public void setExecutable(String executable) {
+        this.executable = executable;
+    }
+    
+    /**
+     * Set the command line arguments for the <code>StreamingCommand</code>.
+     * 
+     * @param argv the command line arguments for the 
+     *             <code>StreamingCommand</code>
+     */
+    public void setCommandArgs(String[] argv) {
+        this.argv = argv;
+    }
+    
+    /**
+     * Get the parsed command arguments.
+     * 
+     * @return the parsed command arguments as <code>String[]</code>
+     */
+    public String[] getCommandArgs() {
+        return argv;
+    }
+
+    /**
+     * Get the list of files which need to be shipped to the cluster.
+     * 
+     * @return the list of files which need to be shipped to the cluster
+     */
+    public List<String> getShipSpecs() {
+        return shipSpec;
+    }
+    
+    /**
+     * Get the list of files which need to be cached on the execute nodes.
+     * 
+     * @return the list of files which need to be cached on the execute nodes
+     */
+    public List<String> getCacheSpecs() {
+        return cacheSpec;
+    }
+
+    /**
+     * Add a file to be shipped to the cluster. 
+     * 
+     * Users can use this to distribute executables and other necessary files
+     * to the clusters.
+     * 
+     * @param path path of the file to be shipped to the cluster
+     */
+    public void addPathToShip(String path) throws IOException {
+        // Validate
+        File file = new File(path);
+        if (!file.exists()) {
+            throw new IOException("Invalid ship specification: '" + path + 
+                                  "' does not exist!");
+        } else if (file.isDirectory()) {
+            throw new IOException("Invalid ship specification: '" + path + 
+                                  "' is a directory and can't be shipped!");
+        }
+        shipSpec.add(path);
+    }
+
+    /**
+     * Add a file to be cached on execute nodes on the cluster. The file is
+     * assumed to be available at the shared filesystem.
+     * 
+     * @param path path of the file to be cached on the execute nodes
+     */
+    public void addPathToCache(String path) throws IOException {
+        // Validate
+        URI pathUri = null;
+        URI dfsPath = null;
+        try {
+            pathUri = new URI(path);
+            
+            // Strip away the URI's _fragment_ and _query_
+            dfsPath = new URI(pathUri.getScheme(), pathUri.getAuthority(), 
+                              pathUri.getPath(), null, null);
+        } catch (URISyntaxException urise) {
+            throw new IOException("Invalid cache specification: " + path);
+        }
+        
+        boolean exists = false;
+        try {
+            exists = FileLocalizer.fileExists(dfsPath.toString(), pigContext);
+        } catch (IOException ioe) {
+            // Throw a better error message...
+            throw new IOException("Invalid cache specification: '" + dfsPath + 
+                                  "' does not exist!");
+        } 
+        
+        if (!exists) {
+            throw new IOException("Invalid cache specification: '" + dfsPath + 
+                                  "' does not exist!");
+        } else if (FileLocalizer.isDirectory(dfsPath.toString(), pigContext)) {
+            throw new IOException("Invalid cache specification: '" + dfsPath + 
+                                  "' is a directory and can't be cached!");
+        }
+
+        cacheSpec.add(path);
+    }
+
+    /**
+     * Attach a {@link HandleSpec} to a given {@link Handle}
+     * @param handle <code>Handle</code> to which the specification is to 
+     *               be attached.
+     * @param handleSpec <code>HandleSpec</code> for the given handle.
+     */
+    public void addHandleSpec(Handle handle, HandleSpec handleSpec) {
+        List<HandleSpec> handleSpecList = handleSpecs.get(handle);
+        
+        if (handleSpecList == null) {
+            handleSpecList = new LinkedList<HandleSpec>();
+            handleSpecs.put(handle, handleSpecList);
+        }
+        
+        handleSpecList.add(handleSpec);
+    }
+    
+    /**
+     * Set the input specification for the <code>StreamingCommand</code>.
+     * 
+     * @param spec input specification
+     */
+    public void setInputSpec(HandleSpec spec) {
+        List<HandleSpec> inputSpecs = getHandleSpecs(Handle.INPUT);
+        if (inputSpecs == null || inputSpecs.size() == 0) {
+            addHandleSpec(Handle.INPUT, spec);
+        } else {
+            inputSpecs.set(0, spec);
+        }
+    }
+    
+    /**
+     * Get the input specification of the <code>StreamingCommand</code>.
+     * 
+     * @return input specification of the <code>StreamingCommand</code>
+     */
+    public HandleSpec getInputSpec() {
+        List<HandleSpec> inputSpecs = getHandleSpecs(Handle.INPUT);
+        if (inputSpecs == null || inputSpecs.size() == 0) {
+            addHandleSpec(Handle.INPUT, new HandleSpec("stdin", PigStorage.class.getName()));
+        }
+        return getHandleSpecs(Handle.INPUT).get(0);        
+    }
+    
+    /**
+     * Set the specification for the primary output of the 
+     * <code>StreamingCommand</code>.
+     * 
+     * @param spec specification for the primary output of the 
+     *             <code>StreamingCommand</code>
+     */
+    public void setOutputSpec(HandleSpec spec) {
+        List<HandleSpec> outputSpecs = getHandleSpecs(Handle.OUTPUT);
+        if (outputSpecs == null || outputSpecs.size() == 0) {
+            addHandleSpec(Handle.OUTPUT, spec);
+        } else {
+            outputSpecs.set(0, spec);
+        }
+    }
+    
+    /**
+     * Get the specification of the primary output of the 
+     * <code>StreamingCommand</code>.
+     * 
+     * @return specification of the primary output of the 
+     *         <code>StreamingCommand</code>
+     */
+    public HandleSpec getOutputSpec() {
+        List<HandleSpec> outputSpecs = getHandleSpecs(Handle.OUTPUT);
+        if (outputSpecs == null || outputSpecs.size() == 0) {
+            addHandleSpec(Handle.OUTPUT, new HandleSpec("stdout", PigStorage.class.getName()));
+        }
+        return getHandleSpecs(Handle.OUTPUT).get(0);
+    }
+    
+    /**
+     * Get specifications for the given <code>Handle</code>.
+     * 
+     * @param handle <code>Handle</code> of the stream
+     * @return specification for the given <code>Handle</code>
+     */
+    public List<HandleSpec> getHandleSpecs(Handle handle) {
+        return handleSpecs.get(handle);
+    }
+    
+    /**
+     * Should the stderr of the managed process be persisted?
+     * 
+     * @return <code>true</code> if the stderr of the managed process should be
+     *         persisted, <code>false</code> otherwise.
+     */
+    public boolean getPersistStderr() {
+        return persistStderr;
+    }
+
+    /**
+     * Specify if the stderr of the managed process should be persisted.
+     * 
+     * @param persistStderr <code>true</code> if the stderr of the managed 
+     *                      process should be persisted, else <code>false</code>
+     */
+    public void setPersistStderr(boolean persistStderr) {
+        this.persistStderr = persistStderr;
+    }
+
+    /**
+     * Get the directory where the log-files of the command are persisted.
+     * 
+     * @return the directory where the log-files of the command are persisted
+     */
+    public String getLogDir() {
+        return logDir;
+    }
+
+    /**
+     * Set the directory where the log-files of the command are persisted.
+     * 
+     * @param logDir the directory where the log-files of the command are persisted
+     */
+    public void setLogDir(String logDir) {
+        this.logDir = logDir;
+        if (this.logDir.startsWith("/")) {
+            this.logDir = this.logDir.substring(1);
+        }
+        setPersistStderr(true);
+    }
+
+    /**
+     * Get the maximum number of tasks whose stderr logs files are persisted.
+     * 
+     * @return the maximum number of tasks whose stderr logs files are persisted
+     */
+    public int getLogFilesLimit() {
+        return logFilesLimit;
+    }
+
+    /**
+     * Set the maximum number of tasks whose stderr logs files are persisted.
+     * @param logFilesLimit the maximum number of tasks whose stderr logs files 
+     *                      are persisted
+     */
+    public void setLogFilesLimit(int logFilesLimit) {
+        this.logFilesLimit = Math.min(MAX_TASKS, logFilesLimit);
+    }
+
+    /**
+     * Set whether files should be shipped or not.
+     * 
+     * @param shipFiles <code>true</code> if files of this command should be
+     *                  shipped, <code>false</code> otherwise
+     */
+    public void setShipFiles(boolean shipFiles) {
+        this.shipFiles = shipFiles;
+    }
+
+    /**
+     * Get whether files for this command should be shipped or not.
+     * 
+     * @return <code>true</code> if files of this command should be shipped, 
+     *         <code>false</code> otherwise
+     */
+    public boolean getShipFiles() {
+        return shipFiles;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        for (String arg : getCommandArgs()) {
+            sb.append(arg);
+            sb.append(" ");
+        }
+        sb.append("(" + getInputSpec().toString() + "/"+getOutputSpec() + ")");
+
+        return sb.toString();
+    }
+    
+    public Object clone() {
+      try {
+        StreamingCommand clone = (StreamingCommand)super.clone();
+
+        clone.shipSpec = new ArrayList<String>(shipSpec);
+        clone.cacheSpec = new ArrayList<String>(cacheSpec);
+        
+        clone.handleSpecs = new HashMap<Handle, List<HandleSpec>>();
+        for (Map.Entry<Handle, List<HandleSpec>> e : handleSpecs.entrySet()) {
+          List<HandleSpec> values = new ArrayList<HandleSpec>();
+          for (HandleSpec spec : e.getValue()) {
+            values.add((HandleSpec)spec.clone());
+          }
+          clone.handleSpecs.put(e.getKey(), values);
+        }
+
+        return clone;
+      } catch (CloneNotSupportedException cnse) {
+        // Shouldn't happen since we do implement Clonable
+        throw new InternalError(cnse.toString());
+      }
+    }
+
+
+    /**
+     * Specification about the usage of the {@link Handle} to communicate
+     * with the external process.
+     * 
+     * It specifies the stream-handle which can be one of <code>stdin</code>/
+     * <code>stdout</code>/<code>stderr</code> or a named file and also the
+     * serializer/deserializer specification to be used to read/write data 
+     * to/from the stream.
+     */
+    public static class HandleSpec 
+    implements Comparable<HandleSpec>, Serializable, Cloneable {
+        private static final long serialVersionUID = 1L;
+
+        String name;
+        String spec;
+        
+        /**
+         * Create a new {@link HandleSpec} with a given name using the default
+         * {@link PigStorage} serializer/deserializer.
+         * 
+         * @param handleName name of the handle (one of <code>stdin</code>,
+         *                   <code>stdout</code> or a file-path)
+         */
+        public HandleSpec(String handleName) {
+            this(handleName, PigStorage.class.getName());
+        }
+        
+        /**
+         * Create a new {@link HandleSpec} with a given name using the default
+         * {@link PigStorage} serializer/deserializer.
+         * 
+         * @param handleName name of the handle (one of <code>stdin</code>,
+         *                   <code>stdout</code> or a file-path)
+         * @param spec serializer/deserializer spec
+         */
+        public HandleSpec(String handleName, String spec) {
+            this.name = handleName;
+            this.spec = spec;
+        }
+
+        public int compareTo(HandleSpec o) {
+            return this.name.compareTo(o.name);
+        }
+        
+        public String toString() {
+            return name + "-" + spec;
+        }
+
+        /**
+         * Get the <b>name</b> of the <code>HandleSpec</code>.
+         * 
+         * @return the <b>name</b> of the <code>HandleSpec</code> (one of 
+         *         <code>stdin</code>, <code>stdout</code> or a file-path)
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * Set the <b>name</b> of the <code>HandleSpec</code>.
+         * 
+         * @param name <b>name</b> of the <code>HandleSpec</code> (one of 
+         *         <code>stdin</code>, <code>stdout</code> or a file-path)
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /**
+         * Get the serializer/deserializer spec of the <code>HandleSpec</code>.
+         * 
+         * @return the serializer/deserializer spec of the 
+         *         <code>HandleSpec</code>
+         */
+        public String getSpec() {
+            return spec;
+        }
+
+        /**
+         * Set the serializer/deserializer spec of the <code>HandleSpec</code>.
+         * 
+         * @param spec the serializer/deserializer spec of the 
+         *             <code>HandleSpec</code>
+         */
+        public void setSpec(String spec) {
+            this.spec = spec;
+        }
+        
+        public boolean equals(Object obj) {
+          HandleSpec other = (HandleSpec)obj;
+          return (name.equals(other.name) && spec.equals(other.spec));
+        }
+
+
+        public Object clone() {
+          try {
+            return super.clone();
+          } catch (CloneNotSupportedException cnse) {
+            // Shouldn't happen since we do implement Clonable
+            throw new InternalError(cnse.toString());
+          }
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Thu Sep  4 14:25:41 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.tools.grunt;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
@@ -172,6 +173,15 @@
             //mPigServer.setJobName(unquote(value));
             mPigServer.setJobName(value);
         }
+        else if (key.equals("stream.skippath")) {
+            // Validate
+            File file = new File(value);
+            if (!file.exists() || file.isDirectory()) {
+                throw new IOException("Invalid value for stream.skippath:" + 
+                                      value); 
+            }
+            mPigServer.addPathToSkip(value);
+        }
         else
         {
             // other key-value pairs can go there