You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/27 20:01:46 UTC

svn commit: r641943 - in /incubator/pig/trunk/src/org/apache/pig: backend/hadoop/streaming/ impl/streaming/

Author: olga
Date: Thu Mar 27 12:01:32 2008
New Revision: 641943

URL: http://svn.apache.org/viewvc?rev=641943&view=rev
Log:
Files that I forgot to svn add as part of commit for PIG-94 patch

Added:
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link HadoopExecutableManager} is a specialization of 
+ * {@link ExecutableManager} and provides HDFS-specific support for secondary
+ * outputs, task-logs etc.
+ * 
+ * <code>HadoopExecutableManager</code> provides support for  secondary outputs
+ * of the managed process and also persists the logs of the tasks on HDFS. 
+ */
+public class HadoopExecutableManager extends ExecutableManager {
+
+    JobConf job;
+    
+    String scriptOutputDir;
+    String scriptLogDir;
+    String taskId;
+    
+    FSDataOutputStream errorStream;
+    
+    boolean writeHeaderFooter = false;
+    
+    public HadoopExecutableManager() {}
+    
+    public void configure(Properties properties, StreamingCommand command, 
+                          DataCollector endOfPipe) 
+    throws IOException, ExecException {
+        super.configure(properties, command, endOfPipe);
+        
+        // Chmod +x the executable
+        File executable = new File(command.getExecutable());
+        if (executable.isAbsolute()) {
+            // we don't own it. Hope it is executable ...
+        } else {
+            try {
+                FileUtil.chmod(executable.toString(), "a+x");
+            } catch (InterruptedException ie) {
+                throw new ExecException(ie);
+            }
+        }
+        
+        // Save the output directory for the Pig Script
+        scriptOutputDir = properties.getProperty("pig.streaming.task.output.dir");
+        scriptLogDir = properties.getProperty("pig.streaming.log.dir");
+        
+        // Save the taskid
+        taskId = properties.getProperty("pig.streaming.task.id");
+        
+        // Save a copy of the JobConf
+        job = PigMapReduce.getPigContext().getJobConf();
+    }
+    
+    protected void exec() throws IOException {
+        // Create the HDFS file for the stderr of the task, if necessary
+        if (writeErrorToHDFS(command.getLogFilesLimit(), taskId)) {
+            try {
+                Path errorFile = 
+                    new Path(new Path(scriptLogDir, command.getLogDir()), taskId);
+                errorStream = 
+                    errorFile.getFileSystem(job).create(errorFile);
+            } catch (IOException ie) {
+                // Don't fail the task if we couldn't save it's stderr on HDFS
+                System.err.println("Failed to create stderr file of task: " +
+                                   taskId + " in HDFS at " + scriptLogDir + 
+                                   " with " + ie);
+                errorStream = null;
+            }
+        }
+        
+        // Header for stderr file of the task
+        writeDebugHeader();
+
+        // Exec the command ...
+        super.exec();
+    }
+
+    public void close() throws IOException, ExecException {
+        super.close();
+        
+        // Footer for stderr file of the task
+        writeDebugFooter();
+        
+        // Copy the secondary outputs of the task to HDFS
+        Path scriptOutputDir = new Path(this.scriptOutputDir);
+        FileSystem fs = scriptOutputDir.getFileSystem(job);
+        List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+        if (outputSpecs != null) {
+            for (int i=1; i < outputSpecs.size(); ++i) {
+                String fileName = outputSpecs.get(i).getName();
+                try {
+                    fs.copyFromLocalFile(false, true, new Path(fileName), 
+                                         new Path(scriptOutputDir, 
+                                                  taskId+"-"+fileName)
+                                        );
+                } catch (IOException ioe) {
+                    System.err.println("Failed to save secondary output '" + 
+                                       fileName + "' of task: " + taskId +
+                                       " with " + ioe);
+                    throw new ExecException(ioe);
+                }
+            }
+        }
+
+        // Close the stderr file on HDFS
+        if (errorStream != null) {
+            errorStream.close();
+        }
+    }
+
+    /**
+     * Should the stderr data of this task be persisted on HDFS?
+     * 
+     * @param limit maximum number of tasks whose stderr log-files are persisted
+     * @param taskId id of the task
+     * @return <code>true</code> if stderr data of task should be persisted on 
+     *         HDFS, <code>false</code> otherwise
+     */
+    private boolean writeErrorToHDFS(int limit, String taskId) {
+        // These are hard-coded begin/end offsets a Hadoop *taskid*
+        int beginIndex = 25, endIndex = 31;   
+        
+        int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+        return command.getPersistStderr() && tipId < command.getLogFilesLimit();
+    }
+    
+    protected void processError(String error) {
+        super.processError(error);
+        
+        try {
+            if (errorStream != null) {
+                errorStream.writeBytes(error);
+            }
+        } catch (IOException ioe) {
+            super.processError("Failed to save error logs to HDFS with: " + 
+                               ioe);
+        }
+    }
+
+    private void writeDebugHeader() {
+        processError("===== Task Information Header =====" );
+
+        processError("\nCommand: " + command.getExecutable());
+        processError("\nStart time: " + new Date(System.currentTimeMillis()));
+        processError("\nInput-split file: " + job.get("map.input.file"));
+        processError("\nInput-split start-offset: " + 
+                job.getLong("map.input.start", -1));
+        processError("\nInput-split length: " + 
+                job.getLong("map.input.length", -1));
+
+        processError("\n=====          * * *          =====\n");
+    }
+    
+    private void writeDebugFooter() {
+        processError("===== Task Information Footer =====");
+
+        processError("\nEnd time: " + new Date(System.currentTimeMillis()));
+        processError("\nExit code: " + exitCode);
+
+        List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
+        HandleSpec inputSpec = 
+            (inputSpecs != null) ? inputSpecs.get(0) : null;
+        processError("\nInput bytes: " + inputBytes + " bytes " +
+                    ((inputSpec != null) ? 
+                            "(" + inputSpec.getName() + " using " + 
+                                inputSpec.getSpec() + ")" 
+                            : ""));
+
+        List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+        HandleSpec outputSpec = 
+            (outputSpecs != null) ? outputSpecs.get(0) : null;
+        processError("\nOutput bytes: " + outputBytes + " bytes " +
+                     ((outputSpec != null) ? 
+                         "(" + outputSpec.getName() + " using " + 
+                             outputSpec.getSpec() + ")" 
+                         : ""));
+        if (outputSpecs != null) {
+            for (int i=1; i < outputSpecs.size(); ++i) {
+                HandleSpec spec = outputSpecs.get(i);
+                processError("\n           " + new File(spec.getName()).length() 
+                             + " bytes using " + spec.getSpec());
+            }
+        }
+
+        processError("\n=====          * * *          =====\n");
+    }
+}
+
+    
\ No newline at end of file

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link DefaultInputHandler} handles the input for the Pig-Streaming
+ * executable in a {@link InputType#SYNCHRONOUS} manner by feeding it input
+ * via its <code>stdin</code>.  
+ */
+public class DefaultInputHandler extends InputHandler {
+    
+    OutputStream stdin;
+    
+    public DefaultInputHandler() {
+        serializer = new PigStorage();
+    }
+    
+    public DefaultInputHandler(HandleSpec spec) {
+        serializer = (StoreFunc)PigContext.instantiateFuncFromSpec(spec.spec);
+    }
+    
+    public InputType getInputType() {
+        return InputType.SYNCHRONOUS;
+    }
+    
+    public void bindTo(OutputStream os) throws IOException {
+        stdin = os;
+        super.bindTo(stdin);
+    }
+    
+    public void close() throws IOException {
+        super.close();
+        stdin.flush();
+        stdin.close();
+        stdin = null;
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileOutputHandler} handles the output from the Pig-Streaming
+ * executable in an {@link OutputType#SYNCHRONOUS} manner by reading its output
+ * via its <code>stdout</code>.
+ */
+public class DefaultOutputHandler extends OutputHandler {
+    
+    public DefaultOutputHandler() {
+        deserializer = new PigStorage();
+    }
+    
+    public DefaultOutputHandler(HandleSpec spec) {
+        deserializer = (LoadFunc)PigContext.instantiateFuncFromSpec(spec.spec);
+    }
+
+    public OutputType getOutputType() {
+        return OutputType.SYNCHRONOUS;
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.streaming.InputHandler.InputType;
+import org.apache.pig.impl.streaming.OutputHandler.OutputType;
+
+/**
+ * {@link ExecutableManager} manages an external executable which processes data
+ * in a Pig query.
+ * 
+ * The <code>ExecutableManager</code> is responsible for startup/teardown of the 
+ * external process and also for managing it.
+ * It feeds input records to the executable via it's <code>stdin</code>, 
+ * collects the output records from the <code>stdout</code> and also diagnostic 
+ * information from the <code>stdout</code>.
+ */
+public class ExecutableManager {
+	private static final Log LOG = 
+		LogFactory.getLog(ExecutableManager.class.getName());
+    private static final int SUCCESS = 0;
+
+	protected StreamingCommand command;        // Streaming command to be run
+	String[] argv;                             // Parsed/split commands
+
+	Process process;                           // Handle to the process
+    protected int exitCode = -127;             // Exit code of the process
+	
+	protected DataOutputStream stdin;          // stdin of the process
+	
+	ProcessOutputThread stdoutThread;          // thread to get process output
+	InputStream stdout;                        // stdout of the process
+	                                           // interpret the process' output
+	
+	ProcessErrorThread stderrThread;           // thread to get process output
+	InputStream stderr;                        // stderr of the process
+	
+	DataCollector endOfPipe;
+
+	// Input/Output handlers
+	InputHandler inputHandler;
+	OutputHandler outputHandler;
+
+	Properties properties;
+
+	protected long inputBytes = 0;
+	protected long outputBytes = 0;
+	
+	public ExecutableManager() {}
+	
+	public void configure(Properties properties, StreamingCommand command, 
+	                      DataCollector endOfPipe) 
+	throws IOException, ExecException {
+	    this.properties = properties;
+	    
+		this.command = command;
+		this.argv = this.command.getCommandArgs();
+
+		// Create the input/output handlers
+		this.inputHandler = HandlerFactory.createInputHandler(command);
+		this.outputHandler = 
+		    HandlerFactory.createOutputHandler(command);
+		
+		// Successor
+		this.endOfPipe = endOfPipe;
+	}
+	
+	public void close() throws IOException, ExecException {
+	    // Close the InputHandler, which in some cases lets the process
+	    // terminate
+		inputHandler.close();
+		
+		// Check if we need to start the process now ...
+		if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+		    exec();
+		}
+		
+		// Wait for the process to exit and the stdout/stderr threads to complete
+		try {
+			exitCode = process.waitFor();
+			
+			if (stdoutThread != null) {
+			    stdoutThread.join(0);
+			}
+			if (stderrThread != null) {
+				stderrThread.join(0);
+			}
+
+		} catch (InterruptedException ie) {}
+
+		// Clean up the process
+		process.destroy();
+		
+        LOG.debug("Process exited with: " + exitCode);
+        if (exitCode != SUCCESS) {
+            throw new ExecException(command + " failed with exit status: " + 
+                                       exitCode);
+        }
+        
+        if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
+            // Trigger the outputHandler
+            outputHandler.bindTo(null);
+
+            // Start the thread to process the output and wait for
+            // it to terminate
+            stdoutThread = new ProcessOutputThread(outputHandler);
+            stdoutThread.start();
+            
+            try {
+                stdoutThread.join(0);
+            } catch (InterruptedException ie) {}
+        }
+
+	}
+
+	protected void exec() throws IOException {
+	    // Unquote command-line arguments ...
+	    for (int i=0; i < argv.length; ++i) {
+	        String arg = argv[i];
+	        if (arg.charAt(0) == '\'' && arg.charAt(arg.length()-1) == '\'') {
+	            argv[i] = arg.substring(1, arg.length()-1);
+	        }
+	    }
+	    
+        // Start the external process
+        ProcessBuilder processBuilder = new ProcessBuilder(argv);
+        process = processBuilder.start();
+        LOG.debug("Started the process for command: " + command);
+        
+        // Pick up the process' stderr stream and start the thread to 
+        // process the stderr stream
+        stderr = 
+            new DataInputStream(new BufferedInputStream(process.getErrorStream()));
+        stderrThread = new ProcessErrorThread();
+        stderrThread.start();
+
+        // Check if we need to handle the process' stdout directly
+        if (outputHandler.getOutputType() == OutputType.SYNCHRONOUS) {
+            // Get hold of the stdout of the process
+            stdout = 
+                new DataInputStream(new BufferedInputStream(process.getInputStream()));
+            
+            // Bind the stdout to the OutputHandler
+            outputHandler.bindTo(stdout);
+            
+            // Start the thread to process the executable's stdout
+            stdoutThread = new ProcessOutputThread(outputHandler);
+            stdoutThread.start();
+        }
+	}
+	
+	public void run() throws IOException {
+	    // Check if we need to exec the process NOW ...
+	    if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+	        return;
+	    }
+	    
+		// Start the executable ...
+	    exec();
+        stdin = 
+            new DataOutputStream(new BufferedOutputStream(process.getOutputStream()));
+	    inputHandler.bindTo(stdin);
+	}
+
+	public void add(Datum d) throws IOException {
+		// Pass the serialized tuple to the executable via the InputHandler
+	    Tuple t = (Tuple)d;
+	    inputHandler.putNext(t);
+	    inputBytes += t.getMemorySize();
+	}
+
+	/**
+	 * Workhorse to process the output of the managed process.
+	 * 
+	 * The <code>ExecutableManager</code>, by default, just pushes the received
+	 * <code>Datum</code> into eval-pipeline to be processed by the successor.
+	 * 
+	 * @param d <code>Datum</code> to process
+	 */
+	protected void processOutput(Datum d) {
+		endOfPipe.add(d);
+	}
+	
+	class ProcessOutputThread extends Thread {
+
+	    OutputHandler outputHandler;
+
+		ProcessOutputThread(OutputHandler outputHandler) {
+			setDaemon(true);
+			this.outputHandler = outputHandler;
+		}
+
+		public void run() {
+			try {
+				// Read tuples from the executable and push them down the pipe
+				Tuple tuple = null;
+				while ((tuple = outputHandler.getNext()) != null) {
+					processOutput(tuple);
+					outputBytes += tuple.getMemorySize();
+				}
+
+				outputHandler.close();
+			} catch (Throwable t) {
+				LOG.warn(t);
+				try {
+				    outputHandler.close();
+				} catch (IOException ioe) {
+					LOG.info(ioe);
+				}
+				throw new RuntimeException(t);
+			}
+		}
+	}
+
+	/**
+	 * Workhorse to process the stderr stream of the managed process.
+	 * 
+	 * By default <code>ExecuatbleManager</code> just sends out the received
+	 * error message to the <code>stderr</code> of itself.
+	 * 
+	 * @param error error message from the managed process.
+	 */
+	protected void processError(String error) {
+		// Just send it out to our stderr
+		System.err.print(error);
+	}
+	
+	class ProcessErrorThread extends Thread {
+
+		public ProcessErrorThread() {
+			setDaemon(true);
+		}
+
+		public void run() {
+			try {
+				String error;
+				BufferedReader reader = 
+					new BufferedReader(new InputStreamReader(stderr));
+				while ((error = reader.readLine()) != null) {
+					processError(error+"\n");
+				}
+
+				if (stderr != null) {
+					stderr.close();
+					LOG.debug("ProcessErrorThread done");
+				}
+			} catch (Throwable th) {
+				LOG.warn(th);
+				try {
+					if (stderr != null) {
+						stderr.close();
+					}
+				} catch (IOException ioe) {
+					LOG.info(ioe);
+	                throw new RuntimeException(th);
+				}
+			}
+		}
+	}
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileInputHandler} handles the input for the Pig-Streaming
+ * executable in an {@link InputType#ASYNCHRONOUS} manner by feeding it input
+ * via an external file specified by the user.  
+ */
+public class FileInputHandler extends InputHandler {
+
+    String fileName;
+    OutputStream fileOutStream;
+    
+    public FileInputHandler(HandleSpec handleSpec) throws ExecException {
+        fileName = handleSpec.name;
+        serializer = 
+            (StoreFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec);
+        
+        try {
+            fileOutStream = new FileOutputStream(new File(fileName)); 
+            super.bindTo(fileOutStream);
+        } catch (IOException fnfe) {
+            throw new ExecException(fnfe);
+        }
+    }
+
+    public InputType getInputType() {
+        return InputType.ASYNCHRONOUS;
+    }
+    
+    public void bindTo(OutputStream os) throws IOException {
+        throw new UnsupportedOperationException("Cannot call bindTo on " +
+        		                                "FileInputHandler");
+    }
+    
+    public void close() throws IOException {
+        super.close();
+        fileOutStream.flush();
+        fileOutStream.close();
+        fileOutStream = null;
+    }
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileOutputHandler} handles the output from the Pig-Streaming
+ * executable in an {@link OutputType#ASYNCHRONOUS} manner by reading it from
+ * an external file specified by the user.  
+ */
+public class FileOutputHandler extends OutputHandler {
+
+    String fileName;
+    InputStream fileInStream;
+    
+    public FileOutputHandler(HandleSpec handleSpec) throws ExecException {
+        fileName = handleSpec.name;
+        deserializer = 
+            (LoadFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec);
+    }
+
+    public OutputType getOutputType() {
+        return OutputType.ASYNCHRONOUS;
+    }
+    
+    public void bindTo(InputStream is) throws IOException {
+        // This is a trigger to start processing the output from the file ...
+        fileInStream = new FileInputStream(new File(fileName)); 
+        super.bindTo(fileInStream);
+    }
+    
+    public void close() throws IOException {
+        super.close();
+        fileInStream.close();
+        fileInStream = null;
+    }
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * Factory to create an {@link InputHandler} or {@link OutputHandler}
+ * depending on the specification of the {@link StreamingCommand}.
+ */
+public class HandlerFactory {
+
+    /**
+     * Create an <code>InputHandler</code> for the given input specification
+     * of the <code>StreamingCommand</code>.
+     * 
+     * @param command <code>StreamingCommand</code>
+     * @return <code>InputHandler</code> for the given input specification
+     * @throws ExecException
+     */
+    public static InputHandler createInputHandler(StreamingCommand command) 
+    throws ExecException {
+        List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
+        
+        HandleSpec in = null;
+        if (inputSpecs == null || (in = inputSpecs.get(0)) == null) {
+            return new DefaultInputHandler();
+        }
+        
+        return (in.name.equals("stdin")) ? new DefaultInputHandler(in) :
+                                           new FileInputHandler(in);
+    }
+    
+    /**
+     * Create an <code>OutputHandler</code> for the given output specification
+     * of the <code>StreamingCommand</code>.
+     * 
+     * @param command <code>StreamingCommand</code>
+     * @return <code>OutputHandler</code> for the given output specification
+     * @throws ExecException
+     */
+    public static OutputHandler createOutputHandler(StreamingCommand command) 
+    throws ExecException {
+        List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+        
+        HandleSpec out = null;
+        if (outputSpecs == null || (out = outputSpecs.get(0)) == null) {
+            return new DefaultOutputHandler();
+        }
+        
+        return (out.name.equals("stdout")) ? new DefaultOutputHandler(out) :
+                                             new FileOutputHandler(out);
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * {@link InputHandler} is responsible for handling the input to the 
+ * Pig-Streaming external command.
+ * 
+ * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS} 
+ * manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS} 
+ * manner via an external file which is subsequently read by the executable.
+ */
+public abstract class InputHandler {
+    /**
+     * 
+     */
+    public enum InputType {SYNCHRONOUS, ASYNCHRONOUS}
+    
+    /*
+     * The serializer to be used to send data to the managed process.
+     * 
+     * It is the responsibility of the concrete sub-classes to setup and
+     * manage the serializer. 
+     */  
+    protected StoreFunc serializer;
+    
+    /**
+     * Get the handled <code>InputType</code>
+     * @return the handled <code>InputType</code>
+     */
+    public abstract InputType getInputType();
+    
+    /**
+     * Send the given input <code>Tuple</code> to the managed executable.
+     * 
+     * @param t input <code>Tuple</code>
+     * @throws IOException
+     */
+    public void putNext(Tuple t) throws IOException {
+        serializer.putNext(t);
+    }
+    
+    /**
+     * Close the <code>InputHandler</code> since there is no more input
+     * to be sent to the managed process.
+     * 
+     * @throws IOException
+     */
+    public void close() throws IOException {
+        serializer.finish();
+    }
+    
+    /**
+     * Bind the <code>InputHandler</code> to the <code>OutputStream</code>
+     * from which it reads input and sends it to the managed process.
+     * 
+     * @param os <code>OutputStream</code> from which to read input data for the
+     *           managed process
+     * @throws IOException
+     */
+    public void bindTo(OutputStream os) throws IOException {
+        serializer.bindTo(os);
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+/**
+ * {@link OutputHandler} is responsible for handling the output of the 
+ * Pig-Streaming external command.
+ * 
+ * The output of the managed executable could be fetched in a 
+ * {@link OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an 
+ * {@link OutputType#ASYNCHRONOUS} manner via an external file to which the
+ * process wrote its output.
+ */
+public abstract class OutputHandler {
+    public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS}
+
+    /*
+     * The deserializer to be used to send data to the managed process.
+     * 
+     * It is the responsibility of the concrete sub-classes to setup and
+     * manage the deserializer. 
+     */  
+    protected LoadFunc deserializer;
+    
+    /**
+     * Get the handled <code>OutputType</code>.
+     * @return the handled <code>OutputType</code> 
+     */
+    public abstract OutputType getOutputType();
+    
+    /**
+     * Bind the <code>OutputHandler</code> to the <code>InputStream</code>
+     * from which to read the output data of the managed process.
+     * 
+     * @param is <code>InputStream</code> from which to read the output data 
+     *           of the managed process
+     * @throws IOException
+     */
+    public void bindTo(InputStream is) throws IOException {
+        deserializer.bindTo("", new BufferedPositionedInputStream(is), 0, 
+                            Long.MAX_VALUE);
+    }
+    
+    /**
+     * Get the next output <code>Tuple</code> of the managed process.
+     * 
+     * @return the next output <code>Tuple</code> of the managed process
+     * @throws IOException
+     */
+    public Tuple getNext() throws IOException {
+        return deserializer.getNext();
+    }
+    
+    /**
+     * Close the <code>OutputHandler</code>.
+     * @throws IOException
+     */
+    public void close() throws IOException {}
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=641943&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu Mar 27 12:01:32 2008
@@ -0,0 +1,347 @@
+package org.apache.pig.impl.streaming;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.pig.builtin.PigStorage;
+
+
+/**
+ * {@link StreamingCommand} represents the specification of an external
+ * command to be executed in a Pig Query. 
+ * 
+ * <code>StreamingCommand</code> encapsulates all relevant details of the
+ * command specified by the user either directly via the <code>STREAM</code>
+ * operator or indirectly via a <code>DEFINE</code> operator. It includes
+ * details such as input/output/error specifications and also files to be
+ * shipped to the cluster and files to be cached.
+ */
+public class StreamingCommand implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    // External command to be executed and it's parsed components
+    String executable;                   
+    String[] argv;
+    
+    // Files to be shipped to the cluster in-order to be executed
+    List<String> shipSpec = new LinkedList<String>();                  
+
+    // Files to be shipped to the cluster in-order to be executed
+    List<String> cacheSpec = new LinkedList<String>();                 
+
+    /**
+     * Handle to communicate with the external process.
+     */
+    public enum Handle {INPUT, OUTPUT}
+    
+    /**
+     * Map from the the stdin/stdout/stderr handles to their specifications
+     */
+    Map<Handle, List<HandleSpec>> handleSpecs = 
+        new TreeMap<Handle, List<HandleSpec>>();
+    
+    // Should the stderr of the process be persisted?
+    boolean persistStderr = false;
+    
+    // Directory where the process's stderr logs should be persisted.
+    String logDir;
+    
+    // Limit on the number of persisted log-files 
+    int logFilesLimit = 100;
+    public static final int MAX_TASKS = 100;
+    
+    boolean shipFiles = true;
+    
+    /**
+     * Create a new <code>StreamingCommand</code> with the given command.
+     * 
+     * @param command streaming command to be executed
+     * @param argv parsed arguments of the <code>command</code>
+     */
+    public StreamingCommand(String[] argv) {
+        this.argv = argv;
+
+        // Assume that argv[0] is the executable
+        this.executable = this.argv[0];
+    }
+    
+    /**
+     * Get the command to be executed.
+     * 
+     * @return the command to be executed
+     */
+    public String getExecutable() {
+        return executable;
+    }
+    
+    /**
+     * Set the executable for the <code>StreamingCommand</code>.
+     * 
+     * @param executable the executable for the <code>StreamingCommand</code>
+     */
+    public void setExecutable(String executable) {
+        this.executable = executable;
+    }
+    
+    /**
+     * Set the command line arguments for the <code>StreamingCommand</code>.
+     * 
+     * @param argv the command line arguments for the 
+     *             <code>StreamingCommand</code>
+     */
+    public void setCommandArgs(String[] argv) {
+        this.argv = argv;
+    }
+    
+    /**
+     * Get the parsed command arguments.
+     * 
+     * @return the parsed command arguments as <code>String[]</code>
+     */
+    public String[] getCommandArgs() {
+        return argv;
+    }
+
+    /**
+     * Get the list of files which need to be shipped to the cluster.
+     * 
+     * @return the list of files which need to be shipped to the cluster
+     */
+    public List<String> getShipSpecs() {
+        return shipSpec;
+    }
+    
+    /**
+     * Get the list of files which need to be cached on the execute nodes.
+     * 
+     * @return the list of files which need to be cached on the execute nodes
+     */
+    public List<String> getCacheSpecs() {
+        return cacheSpec;
+    }
+
+    /**
+     * Add a file to be shipped to the cluster. 
+     * 
+     * Users can use this to distribute executables and other necessary files
+     * to the clusters.
+     * 
+     * @param path path of the file to be shipped to the cluster
+     */
+    public void addPathToShip(String path) {
+        shipSpec.add(path);
+    }
+
+    /**
+     * Add a file to be cached on execute nodes on the cluster. The file is
+     * assumed to be available at the shared filesystem.
+     * 
+     * @param path path of the file to be cached on the execute nodes
+     */
+    public void addPathToCache(String path) {
+        cacheSpec.add(path);
+    }
+
+    /**
+     * Attach a {@link HandleSpec} to a given {@link Handle}
+     * @param handle <code>Handle</code> to which the specification is to 
+     *               be attached.
+     * @param handleSpec <code>HandleSpec</code> for the given handle.
+     */
+    public void addHandleSpec(Handle handle, HandleSpec handleSpec) {
+        List<HandleSpec> handleSpecList = handleSpecs.get(handle);
+        
+        if (handleSpecList == null) {
+            handleSpecList = new LinkedList<HandleSpec>();
+            handleSpecs.put(handle, handleSpecList);
+        }
+        
+        handleSpecList.add(handleSpec);
+    }
+    
+    /**
+     * Get specifications for the given <code>Handle</code>.
+     * 
+     * @param handle <code>Handle</code> of the stream
+     * @return specification for the given <code>Handle</code>
+     */
+    public List<HandleSpec> getHandleSpecs(Handle handle) {
+        return handleSpecs.get(handle);
+    }
+    
+    /**
+     * Should the stderr of the managed process be persisted?
+     * 
+     * @return <code>true</code> if the stderr of the managed process should be
+     *         persisted, <code>false</code> otherwise.
+     */
+    public boolean getPersistStderr() {
+        return persistStderr;
+    }
+
+    /**
+     * Specify if the stderr of the managed process should be persisted.
+     * 
+     * @param persistStderr <code>true</code> if the stderr of the managed 
+     *                      process should be persisted, else <code>false</code>
+     */
+    public void setPersistStderr(boolean persistStderr) {
+        this.persistStderr = persistStderr;
+    }
+
+    /**
+     * Get the directory where the log-files of the command are persisted.
+     * 
+     * @return the directory where the log-files of the command are persisted
+     */
+    public String getLogDir() {
+        return logDir;
+    }
+
+    /**
+     * Set the directory where the log-files of the command are persisted.
+     * 
+     * @param logDir the directory where the log-files of the command are persisted
+     */
+    public void setLogDir(String logDir) {
+        this.logDir = logDir;
+        if (this.logDir.startsWith("/")) {
+            this.logDir = this.logDir.substring(1);
+        }
+        setPersistStderr(true);
+    }
+
+    /**
+     * Get the maximum number of tasks whose stderr logs files are persisted.
+     * 
+     * @return the maximum number of tasks whose stderr logs files are persisted
+     */
+    public int getLogFilesLimit() {
+        return logFilesLimit;
+    }
+
+    /**
+     * Set the maximum number of tasks whose stderr logs files are persisted.
+     * @param logFilesLimit the maximum number of tasks whose stderr logs files 
+     *                      are persisted
+     */
+    public void setLogFilesLimit(int logFilesLimit) {
+        this.logFilesLimit = Math.min(MAX_TASKS, logFilesLimit);
+    }
+
+    /**
+     * Set whether files should be shipped or not.
+     * 
+     * @param shipFiles <code>true</code> if files of this command should be
+     *                  shipped, <code>false</code> otherwise
+     */
+    public void setShipFiles(boolean shipFiles) {
+        this.shipFiles = shipFiles;
+    }
+
+    /**
+     * Get whether files for this command should be shipped or not.
+     * 
+     * @return <code>true</code> if files of this command should be shipped, 
+     *         <code>false</code> otherwise
+     */
+    public boolean getShipFiles() {
+        return shipFiles;
+    }
+
+    public String toString() {
+        return executable;
+    }
+    
+    /**
+     * Specification about the usage of the {@link Handle} to communicate
+     * with the external process.
+     * 
+     * It specifies the stream-handle which can be one of <code>stdin</code>/
+     * <code>stdout</code>/<code>stderr</code> or a named file and also the
+     * serializer/deserializer specification to be used to read/write data 
+     * to/from the stream.
+     */
+    public static class HandleSpec 
+    implements Comparable<HandleSpec>, Serializable {
+        private static final long serialVersionUID = 1L;
+
+        String name;
+        String spec;
+        
+        /**
+         * Create a new {@link HandleSpec} with a given name using the default
+         * {@link PigStorage} serializer/deserializer.
+         * 
+         * @param handleName name of the handle (one of <code>stdin</code>,
+         *                   <code>stdout</code> or a file-path)
+         */
+        public HandleSpec(String handleName) {
+            this(handleName, PigStorage.class.getName());
+        }
+        
+        /**
+         * Create a new {@link HandleSpec} with a given name using the default
+         * {@link PigStorage} serializer/deserializer.
+         * 
+         * @param handleName name of the handle (one of <code>stdin</code>,
+         *                   <code>stdout</code> or a file-path)
+         * @param spec serializer/deserializer spec
+         */
+        public HandleSpec(String handleName, String spec) {
+            this.name = handleName;
+            this.spec = spec;
+        }
+
+        public int compareTo(HandleSpec o) {
+            return this.name.compareTo(o.name);
+        }
+        
+        public String toString() {
+            return name + " using " + spec;
+        }
+
+        /**
+         * Get the <b>name</b> of the <code>HandleSpec</code>.
+         * 
+         * @return the <b>name</b> of the <code>HandleSpec</code> (one of 
+         *         <code>stdin</code>, <code>stdout</code> or a file-path)
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * Set the <b>name</b> of the <code>HandleSpec</code>.
+         * 
+         * @param name <b>name</b> of the <code>HandleSpec</code> (one of 
+         *         <code>stdin</code>, <code>stdout</code> or a file-path)
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /**
+         * Get the serializer/deserializer spec of the <code>HandleSpec</code>.
+         * 
+         * @return the serializer/deserializer spec of the 
+         *         <code>HandleSpec</code>
+         */
+        public String getSpec() {
+            return spec;
+        }
+
+        /**
+         * Set the serializer/deserializer spec of the <code>HandleSpec</code>.
+         * 
+         * @param spec the serializer/deserializer spec of the 
+         *             <code>HandleSpec</code>
+         */
+        public void setSpec(String spec) {
+            this.spec = spec;
+        }
+    }
+}
\ No newline at end of file