You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/09/23 17:56:17 UTC

svn commit: r1525632 [1/3] - in /pig/trunk: ./ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/streaming/ src/org/apache/pig/impl/util/ src/org/apache/pig/scripting/ src/org/apache/pig/scripting/streaming/ src/org/apache/pig/scripting/streamin...

Author: daijy
Date: Mon Sep 23 15:56:16 2013
New Revision: 1525632

URL: http://svn.apache.org/r1525632
Log:
PIG-2417: Streaming UDFs - allow users to easily write UDFs in scripting languages with no JVM implementation

Added:
    pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java
    pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java
    pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java
    pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java
    pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java
    pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java
    pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java
    pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java
    pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java
    pig/trunk/src/org/apache/pig/scripting/streaming/
    pig/trunk/src/org/apache/pig/scripting/streaming/python/
    pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java
    pig/trunk/src/python/
    pig/trunk/src/python/streaming/
    pig/trunk/src/python/streaming/controller.py
    pig/trunk/src/python/streaming/pig_util.py
    pig/trunk/test/e2e/pig/udfs/cpython/
    pig/trunk/test/e2e/pig/udfs/cpython/morepythonudfs.py
    pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py
    pig/trunk/test/org/apache/pig/impl/builtin/
    pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
    pig/trunk/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java
    pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
    pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUtil.java
    pig/trunk/test/org/apache/pig/test/TestPigStreaming.java
    pig/trunk/test/python/
    pig/trunk/test/python/streaming/
    pig/trunk/test/python/streaming/test_controller.py
Removed:
    pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
    pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
    pig/trunk/src/org/apache/pig/impl/util/JarManager.java
    pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java
    pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
    pig/trunk/test/e2e/pig/build.xml
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/excluded-tests-23
    pig/trunk/test/unit-tests

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 23 15:56:16 2013
@@ -30,6 +30,9 @@ PIG-3174: Remove rpm and deb artifacts f
 
 IMPROVEMENTS
 
+PIG-2417: Streaming UDFs - allow users to easily write UDFs in scripting languages with no
+          JVM implementation. (jeremykarn via daijy)
+
 PIG-3199: Provide a method to retriever name of loader/storer in PigServer (prkommireddi via daijy)
 
 PIG-3367: Add assert keyword (operator) in pig (aniket486)

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Mon Sep 23 15:56:16 2013
@@ -40,6 +40,7 @@
     <!-- source properties -->
     <property name="lib.dir" value="${basedir}/lib" />
     <property name="src.dir" value="${basedir}/src" />
+    <property name="python.src.dir" value="${src.dir}/python" />
     <property name="src.lib.dir" value="${basedir}/lib-src" />
     <property name="src.gen.dir" value="${basedir}/src-gen" />
     <property name="docs.dir" value="${basedir}/src/docs" />
@@ -583,6 +584,9 @@
             </javac>
             <copy file="${src.dir}/org/apache/pig/tools/grunt/autocomplete" todir="${build.classes}/org/apache/pig/tools/grunt"/>
             <copy file="${src.dir}/org/apache/pig/tools/grunt/autocomplete_aliases" todir="${build.classes}/org/apache/pig/tools/grunt"/>
+            <copy todir="${build.classes}/python">
+                <fileset dir="${python.src.dir}"/>
+            </copy>
         </sequential>
     </macrodef>
 
@@ -651,6 +655,7 @@
             </manifest>
             <fileset dir="${src.lib.dir}/shock"/>
             <fileset dir="${src.lib.dir}/bzip2"/>
+            <fileset dir="${python.src.dir}"/>
         </jar>
     </target>
     <target name="javadoc-jar" depends="cc-compile, javadoc">

Added: pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java (added)
+++ pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,463 @@
+package org.apache.pig.impl.builtin;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.ExecTypeProvider;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.streaming.InputHandler;
+import org.apache.pig.impl.streaming.OutputHandler;
+import org.apache.pig.impl.streaming.PigStreamingUDF;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingUDFException;
+import org.apache.pig.impl.streaming.StreamingUDFInputHandler;
+import org.apache.pig.impl.streaming.StreamingUDFOutputHandler;
+import org.apache.pig.impl.streaming.StreamingUDFOutputSchemaException;
+import org.apache.pig.impl.streaming.StreamingUtil;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+import org.apache.pig.scripting.ScriptingOutputCapturer;
+
+import com.google.common.base.Charsets;
+
+public class StreamingUDF extends EvalFunc<Object> {
+    private static final Log log = LogFactory.getLog(StreamingUDF.class);
+
+    private static final String PYTHON_CONTROLLER_JAR_PATH = "/python/streaming/controller.py"; //Relative to root of pig jar.
+    private static final String PYTHON_PIG_UTIL_PATH = "/python/streaming/pig_util.py"; //Relative to root of pig jar.
+
+    //Indexes for arguments being passed to external process
+    private static final int UDF_LANGUAGE = 0;
+    private static final int PATH_TO_CONTROLLER_FILE = 1;
+    private static final int UDF_FILE_NAME = 2; //Name of file where UDF function is defined
+    private static final int UDF_FILE_PATH = 3; //Path to directory containing file where UDF function is defined
+    private static final int UDF_NAME = 4; //Name of UDF function being called.
+    private static final int PATH_TO_FILE_CACHE = 5; //Directory where required files (like pig_util) are cached on cluster nodes.
+    private static final int STD_OUT_OUTPUT_PATH = 6; //File for output from when user writes to standard output.
+    private static final int STD_ERR_OUTPUT_PATH = 7; //File for output from when user writes to standard error.
+    private static final int CONTROLLER_LOG_FILE_PATH = 8; //Controller log file logs progress through the controller script not user code.
+    private static final int IS_ILLUSTRATE = 9; //Controller captures output differently in illustrate vs running.
+    
+    private String language;
+    private String filePath;
+    private String funcName;
+    private Schema schema;
+    private ExecType execType;
+    private String isIllustrate;
+    
+    private boolean initialized = false;
+    private ScriptingOutputCapturer soc;
+
+    private Process process; // Handle to the external process
+    private ProcessErrorThread stderrThread; // thread to get process stderr
+    private ProcessInputThread stdinThread; // thread to send input to process
+    private ProcessOutputThread stdoutThread; //thread to read output from process
+    
+    private InputHandler inputHandler;
+    private OutputHandler outputHandler;
+
+    private BlockingQueue<Tuple> inputQueue;
+    private BlockingQueue<Object> outputQueue;
+
+    private DataOutputStream stdin; // stdin of the process
+    private InputStream stdout; // stdout of the process
+    private InputStream stderr; // stderr of the process
+
+    private static final Object ERROR_OUTPUT = new Object();
+    private static final Object NULL_OBJECT = new Object(); //BlockingQueue can't have null.  Use place holder object instead.
+    
+    private volatile StreamingUDFException outerrThreadsError;
+    
+    public static final String TURN_ON_OUTPUT_CAPTURING = "TURN_ON_OUTPUT_CAPTURING";
+
+    public StreamingUDF(String language, 
+                        String filePath, String funcName, 
+                        String outputSchemaString, String schemaLineNumber,
+                        String execType, String isIllustrate)
+                                throws StreamingUDFOutputSchemaException, ExecException {
+        this.language = language;
+        this.filePath = filePath;
+        this.funcName = funcName;
+        try {
+            this.schema = Utils.getSchemaFromString(outputSchemaString);
+            //ExecTypeProvider.fromString doesn't seem to load the ExecTypes in 
+            //mapreduce mode so we'll try to figure out the exec type ourselves.
+            if (execType.equals("local")) {
+                this.execType = ExecType.LOCAL;
+            } else if (execType.equals("mapreduce")) {
+                this.execType = ExecType.MAPREDUCE;
+            } else {
+                //Not sure what exec type - try to get it from the string.
+                this.execType = ExecTypeProvider.fromString(execType);
+            }
+        } catch (ParserException pe) {
+            throw new StreamingUDFOutputSchemaException(pe.getMessage(), Integer.valueOf(schemaLineNumber));
+        } catch (IOException ioe) {
+            String errorMessage = "Invalid exectype passed to StreamingUDF. Should be local or mapreduce";
+            log.error(errorMessage, ioe);
+            throw new ExecException(errorMessage, ioe);
+        }
+        this.isIllustrate = isIllustrate;
+    }
+    
+    @Override
+    public Object exec(Tuple input) throws IOException {
+        if (!initialized) {
+            initialize();
+            initialized = true;
+        }
+        return getOutput(input);
+    }
+
+    private void initialize() throws ExecException, IOException {
+        inputQueue = new ArrayBlockingQueue<Tuple>(1);
+        outputQueue = new ArrayBlockingQueue<Object>(2);
+        soc = new ScriptingOutputCapturer(execType);
+        startUdfController();
+        createInputHandlers();
+        setStreams();
+        startThreads();
+    }
+
+    private StreamingCommand startUdfController() throws IOException {
+        StreamingCommand sc = new StreamingCommand(null, constructCommand());
+        ProcessBuilder processBuilder = StreamingUtil.createProcess(sc);
+        process = processBuilder.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new ProcessKiller() ) );
+        return sc;
+    }
+
+    private String[] constructCommand() throws IOException {
+        String[] command = new String[10];
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
+
+        String jarPath = conf.get("mapred.jar");
+        String jobDir;
+        if (jarPath != null) {
+            jobDir = new File(jarPath).getParent();
+        } else {
+            jobDir = "";
+        }
+        
+        String standardOutputRootWriteLocation = soc.getStandardOutputRootWriteLocation();
+        String controllerLogFileName, outFileName, errOutFileName;
+
+        if (execType.isLocal()) {
+            controllerLogFileName = standardOutputRootWriteLocation + funcName + "_python.log";
+            outFileName = standardOutputRootWriteLocation + "cpython_" + funcName + "_" + ScriptingOutputCapturer.getRunId() + ".out";
+            errOutFileName = standardOutputRootWriteLocation + "cpython_" + funcName + "_" + ScriptingOutputCapturer.getRunId() + ".err";
+        } else {
+            controllerLogFileName = standardOutputRootWriteLocation + funcName + "_python.log";
+            outFileName = standardOutputRootWriteLocation + funcName + ".out";
+            errOutFileName = standardOutputRootWriteLocation + funcName + ".err";
+        }
+
+        soc.registerOutputLocation(funcName, outFileName);
+
+        command[UDF_LANGUAGE] = language;
+        command[PATH_TO_CONTROLLER_FILE] = getControllerPath(jobDir);
+        int lastSeparator = filePath.lastIndexOf(File.separator) + 1;
+        command[UDF_FILE_NAME] = filePath.substring(lastSeparator);
+        command[UDF_FILE_PATH] = lastSeparator <= 0 ? 
+                "." : 
+                filePath.substring(0, lastSeparator - 1);
+        command[UDF_NAME] = funcName;
+        command[PATH_TO_FILE_CACHE] = "\"" + jobDir + filePath.substring(0, lastSeparator) + "\"";
+        command[STD_OUT_OUTPUT_PATH] = outFileName;
+        command[STD_ERR_OUTPUT_PATH] = errOutFileName;
+        command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName;
+        command[IS_ILLUSTRATE] = isIllustrate;
+        return command;
+    }
+
+    private void createInputHandlers() throws ExecException, FrontendException {
+        PigStreamingUDF serializer = new PigStreamingUDF();
+        this.inputHandler = new StreamingUDFInputHandler(serializer);
+        PigStreamingUDF deserializer = new PigStreamingUDF(schema.getField(0));
+        this.outputHandler = new StreamingUDFOutputHandler(deserializer);
+    }
+
+    private void setStreams() throws IOException { 
+        stdout = new DataInputStream(new BufferedInputStream(process
+                .getInputStream()));
+        outputHandler.bindTo("", new BufferedPositionedInputStream(stdout),
+                0, Long.MAX_VALUE);
+        
+        stdin = new DataOutputStream(new BufferedOutputStream(process
+                .getOutputStream()));
+        inputHandler.bindTo(stdin); 
+        
+        stderr = new DataInputStream(new BufferedInputStream(process
+                .getErrorStream()));
+    }
+
+    private void startThreads() {
+        stdinThread = new ProcessInputThread();
+        stdinThread.start();
+        
+        stdoutThread = new ProcessOutputThread();
+        stdoutThread.start();
+        
+        stderrThread = new ProcessErrorThread();
+        stderrThread.start();
+    }
+
+    /**
+     * Find the path to the controller file for the streaming language.
+     *
+     * First check path to job jar and if the file is not found (like in the
+     * case of running hadoop in standalone mode) write the necessary files
+     * to temporary files and return that path.
+     *
+     * @param language
+     * @param jarPath
+     * @return
+     * @throws IOException
+     */
+    private String getControllerPath(String jarPath) throws IOException {
+        if (language.toLowerCase().equals("python")) {
+            String controllerPath = jarPath + PYTHON_CONTROLLER_JAR_PATH;
+            File controller = new File(controllerPath);
+            if (!controller.exists()) {
+                File controllerFile = File.createTempFile("controller", ".py");
+                InputStream pythonControllerStream = Launcher.class.getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH);
+                try {
+                    FileUtils.copyInputStreamToFile(pythonControllerStream, controllerFile);
+                } finally {
+                    pythonControllerStream.close();
+                }
+                controllerFile.deleteOnExit();
+                File pigUtilFile = new File(controllerFile.getParent() + "/pig_util.py");
+                pigUtilFile.deleteOnExit();
+                InputStream pythonUtilStream = Launcher.class.getResourceAsStream(PYTHON_PIG_UTIL_PATH);
+                try {
+                    FileUtils.copyInputStreamToFile(pythonUtilStream, pigUtilFile);
+                } finally {
+                    pythonUtilStream.close();
+                }
+                controllerPath = controllerFile.getAbsolutePath();
+            }
+            return controllerPath;
+        } else {
+            throw new ExecException("Invalid language: " + language);
+        }
+    }
+
+    /**
+     * Returns a list of file names (relative to root of pig jar) of files that need to be
+     * included in the jar shipped to the cluster.
+     *
+     * Will need to be smarter as more languages are added and the controller files are large.
+     *
+     * @return
+     */
+    public static List<String> getResourcesForJar() {
+        List<String> files = new ArrayList<String>();
+        files.add(PYTHON_CONTROLLER_JAR_PATH);
+        files.add(PYTHON_PIG_UTIL_PATH);
+        return files;
+    }
+
+    private Object getOutput(Tuple input) throws ExecException {
+        if (outputQueue == null) {
+            throw new ExecException("Process has already been shut down.  No way to retrieve output for input: " + input);
+        }
+
+        if (ScriptingOutputCapturer.isClassCapturingOutput() && 
+                !soc.isInstanceCapturingOutput()) {
+            Tuple t = TupleFactory.getInstance().newTuple(TURN_ON_OUTPUT_CAPTURING);
+            try {
+                inputQueue.put(t);
+            } catch (InterruptedException e) {
+                throw new ExecException("Failed adding capture input flag to inputQueue");
+            }
+            soc.setInstanceCapturingOutput(true);
+        }
+
+        try {
+            if (this.getInputSchema() == null || this.getInputSchema().size() == 0) {
+                //When nothing is passed into the UDF the tuple 
+                //being sent is the full tuple for the relation.
+                //We want it to be nothing (since that's what the user wrote).
+                input = TupleFactory.getInstance().newTuple(0);
+            }
+            inputQueue.put(input);
+        } catch (Exception e) {
+            throw new ExecException("Failed adding input to inputQueue", e);
+        }
+        Object o = null;
+        try {
+            if (outputQueue != null) {
+                o = outputQueue.take();
+                if (o == NULL_OBJECT) {
+                    o = null;
+                }
+            }
+        } catch (Exception e) {
+            throw new ExecException("Problem getting output", e);
+        }
+
+        if (o == ERROR_OUTPUT) {
+            outputQueue = null;
+            if (outerrThreadsError == null) {
+                outerrThreadsError = new StreamingUDFException(this.language, "Problem with streaming udf.  Can't recreate exception.");
+            }
+            throw outerrThreadsError;
+        }
+
+        return o;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return this.schema;
+    }
+
+    /**
+     * The thread which consumes input and feeds it to the the Process
+     */
+    class ProcessInputThread extends Thread {
+        ProcessInputThread() {
+            setDaemon(true);
+        }
+
+        public void run() {
+            try {
+                log.debug("Starting PIT");
+                while (true) {
+                    Tuple inputTuple = inputQueue.take();
+                    inputHandler.putNext(inputTuple);
+                    try {
+                        stdin.flush();
+                    } catch(Exception e) {
+                        return;
+                    }
+                }
+            } catch (Exception e) {
+                log.error(e);
+            }
+        }
+    }
+    
+    private static final int WAIT_FOR_ERROR_LENGTH = 500;
+    private static final int MAX_WAIT_FOR_ERROR_ATTEMPTS = 5;
+    
+    /**
+     * The thread which consumes output from process
+     */
+    class ProcessOutputThread extends Thread {
+        ProcessOutputThread() {
+            setDaemon(true);
+        }
+
+        public void run() {
+            Object o = null;
+            try{
+                log.debug("Starting POT");
+                //StreamUDFToPig wraps object in single element tuple
+                o = outputHandler.getNext().get(0);
+                while (o != OutputHandler.END_OF_OUTPUT) {
+                    if (o != null)
+                        outputQueue.put(o);
+                    else
+                        outputQueue.put(NULL_OBJECT);
+                    o = outputHandler.getNext().get(0);
+                }
+            } catch(Exception e) {
+                if (outputQueue != null) {
+                    try {
+                        //Give error thread a chance to check the standard error output
+                        //for an exception message.
+                        int attempt = 0;
+                        while (stderrThread.isAlive() && attempt < MAX_WAIT_FOR_ERROR_ATTEMPTS) {
+                            Thread.sleep(WAIT_FOR_ERROR_LENGTH);
+                            attempt++;
+                        }
+                        //Only write this if no other error.  Don't want to overwrite
+                        //an error from the error thread.
+                        if (outerrThreadsError == null) {
+                            outerrThreadsError = new StreamingUDFException(language, "Error deserializing output.  Please check that the declared outputSchema for function " +
+                                                                        funcName + " matches the data type being returned.", e);
+                        }
+                        outputQueue.put(ERROR_OUTPUT); //Need to wake main thread.
+                    } catch(InterruptedException ie) {
+                        log.error(ie);
+                    }
+                }
+            }
+        }
+    }
+
+    class ProcessErrorThread extends Thread {
+        public ProcessErrorThread() {
+            setDaemon(true);
+        }
+
+        public void run() {
+            try {
+                log.debug("Starting PET");
+                Integer lineNumber = null;
+                StringBuffer error = new StringBuffer();
+                String errInput;
+                BufferedReader reader = new BufferedReader(
+                        new InputStreamReader(stderr, Charsets.UTF_8));
+                while ((errInput = reader.readLine()) != null) {
+                    //First line of error stream is usually the line number of error.
+                    //If its not a number just treat it as first line of error message.
+                    if (lineNumber == null) {
+                        try {
+                            lineNumber = Integer.valueOf(errInput);
+                        } catch (NumberFormatException nfe) {
+                            error.append(errInput + "\n");
+                        }
+                    } else {
+                        error.append(errInput + "\n");
+                    }
+                }
+                outerrThreadsError = new StreamingUDFException(language, error.toString(), lineNumber);
+                if (outputQueue != null) {
+                    outputQueue.put(ERROR_OUTPUT); //Need to wake main thread.
+                }
+                if (stderr != null) {
+                    stderr.close();
+                    stderr = null;
+                }
+            } catch (IOException e) {
+                log.debug("Process Ended");
+            } catch (Exception e) {
+                log.error("standard error problem", e);
+            }
+        }
+    }
+    
+    public class ProcessKiller implements Runnable {
+        public void run() {
+            process.destroy();
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Mon Sep 23 15:56:16 2013
@@ -17,32 +17,24 @@
  */
 package org.apache.pig.impl.streaming;
 
-import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.streaming.InputHandler.InputType;
 import org.apache.pig.impl.streaming.OutputHandler.OutputType;
 import org.apache.pig.impl.util.UDFContext;
@@ -60,12 +52,9 @@ import org.apache.pig.impl.util.UDFConte
 public class ExecutableManager {
     private static final Log LOG = LogFactory.getLog(ExecutableManager.class);
     private static final int SUCCESS = 0;
-    private static final String PATH = "PATH";
-    private static final String BASH = "bash";
     private static final Result EOS_RESULT = new Result(POStatus.STATUS_EOS, null);
 
     protected StreamingCommand command; // Streaming command to be run
-    String argvAsString; // Parsed commands
 
     Process process; // Handle to the process
     protected int exitCode = -127; // Exit code of the process
@@ -109,12 +98,6 @@ public class ExecutableManager {
     public void configure(POStream stream) throws IOException, ExecException {
         this.poStream = stream;
         this.command = stream.getCommand();
-        String[] argv = this.command.getCommandArgs();
-        argvAsString = "";
-        for (String arg : argv) {
-            argvAsString += arg;
-            argvAsString += " ";
-        }
 
         // Create the input/output handlers
         this.inputHandler = HandlerFactory.createInputHandler(command);
@@ -204,57 +187,6 @@ public class ExecutableManager {
     }
 
     /**
-     * Set up the run-time environment of the managed process.
-     *
-     * @param pb
-     *            {@link ProcessBuilder} used to exec the process
-     */
-    protected void setupEnvironment(ProcessBuilder pb) {
-        String separator = ":";
-        Configuration conf = UDFContext.getUDFContext().getJobConf();
-        Map<String, String> env = pb.environment();
-        addJobConfToEnvironment(conf, env);
-
-        // Add the current-working-directory to the $PATH
-        File dir = pb.directory();
-        String cwd = (dir != null) ? dir.getAbsolutePath() : System
-                .getProperty("user.dir");
-
-        String envPath = env.get(PATH);
-        if (envPath == null) {
-            envPath = cwd;
-        } else {
-            envPath = envPath + separator + cwd;
-        }
-        env.put(PATH, envPath);
-    }
-
-    void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
-        String propsToSend = conf.get(PIG_STREAMING_ENVIRONMENT);
-        LOG.debug("Properties to ship to streaming environment set in "+PIG_STREAMING_ENVIRONMENT+": " + propsToSend);
-        if (propsToSend == null) {
-            return;
-        }
-
-        for (String prop : propsToSend.split(",")) {
-            String value = conf.get(prop);
-            if (value == null) {
-                LOG.warn("Property set in "+PIG_STREAMING_ENVIRONMENT+" not found in Configuration: " + prop);
-                continue;
-            }
-            LOG.debug("Setting property in streaming environment: " + prop);
-            envPut(env, prop, value);
-        }
-    }
-
-    void envPut(Map<String, String> env, String name, String value) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Add  env entry:" + name + "=" + value);
-        }
-        env.put(name, value);
-    }
-
-    /**
      * Start execution of the external process.
      *
      * This takes care of setting up the environment of the process and also
@@ -264,26 +196,7 @@ public class ExecutableManager {
      * @throws IOException
      */
     protected void exec() throws IOException {
-        // Set the actual command to run with 'bash -c exec ...'
-        List<String> cmdArgs = new ArrayList<String>();
-
-        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
-          cmdArgs.add("cmd");
-          cmdArgs.add("/c");
-          cmdArgs.add(argvAsString);
-        } else {
-          cmdArgs.add(BASH);
-          cmdArgs.add("-c");
-          StringBuffer sb = new StringBuffer();
-          sb.append("exec ");
-          sb.append(argvAsString);
-          cmdArgs.add(sb.toString());
-        }
-
-        // Start the external process
-        ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs
-                .toArray(new String[cmdArgs.size()]));
-        setupEnvironment(processBuilder);
+        ProcessBuilder processBuilder = StreamingUtil.createProcess(this.command);
         process = processBuilder.start();
         LOG.debug("Started the process for command: " + command);
 
@@ -514,7 +427,7 @@ public class ExecutableManager {
                 try {
                     Result res = new Result();
                     res.result = "Error reading output from Streaming binary:" +
-                            "'" + argvAsString + "':" + t.getMessage();
+                            "'" + command.toString() + "':" + t.getMessage();
                     res.returnStatus = POStatus.STATUS_ERR;
                     sendOutput(binaryOutputQueue, res);
                     killProcess(process);
@@ -547,7 +460,7 @@ public class ExecutableManager {
 
                     }
                     // signal error
-                    String errMsg = "Failure while waiting for process (" + argvAsString + ")" +
+                    String errMsg = "Failure while waiting for process (" + command.toString() + ")" +
                             ie.getMessage();
                     LOG.error(errMsg, ie);
                     res.result = errMsg;
@@ -561,7 +474,7 @@ public class ExecutableManager {
                 } else {
                     // signal Error
 
-                    String errMsg = "'" + argvAsString + "'" + " failed with exit status: "
+                    String errMsg = "'" + command.toString() + "'" + " failed with exit status: "
                             + exitCode;
                     LOG.error(errMsg);
                     res.result = errMsg;

Modified: pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Mon Sep 23 15:56:16 2013
@@ -26,6 +26,8 @@ import org.apache.pig.StreamToPig;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
+import com.google.common.base.Charsets;
+
 /**
  * {@link OutputHandler} is responsible for handling the output of the
  * Pig-Streaming external command.
@@ -36,6 +38,9 @@ import org.apache.pig.impl.io.BufferedPo
  * process wrote its output.
  */
 public abstract class OutputHandler {
+    public static final Object END_OF_OUTPUT = new Object();
+    private static final byte[] DEFAULT_RECORD_DELIM = new byte[] {'\n'};
+
     public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS}
 
     /*
@@ -53,6 +58,11 @@ public abstract class OutputHandler {
     private Text currValue = new Text();
 
     private BufferedPositionedInputStream istream;
+    
+    //Both of these ignore the trailing \n.  So if the
+    //default delimiter is "\n" recordDelimStr is "".
+    private String recordDelimStr = null;
+    private int recordDelimLength = 0;
 
     /**
      * Get the handled <code>OutputType</code>.
@@ -92,8 +102,7 @@ public abstract class OutputHandler {
         }
 
         currValue.clear();
-        int num = in.readLine(currValue);
-        if (num <= 0) {
+        if (!readValue()) {
             return null;
         }
 
@@ -106,6 +115,54 @@ public abstract class OutputHandler {
         }
     }
 
+    private boolean readValue() throws IOException {
+        int num = in.readLine(currValue);
+        if (num <= 0) {
+            return false;
+        }
+
+        while(!isEndOfRow()) {
+            //Need to add back the newline character we ate.
+            currValue.append(new byte[] {'\n'}, 0, 1);
+
+            byte[] lineBytes = readNextLine();
+            if (lineBytes == null) {
+                //We have no more input, so just break;
+                break;
+            }
+            currValue.append(lineBytes, 0, lineBytes.length);
+        }
+        
+        return true;
+    }
+    
+    private byte[] readNextLine() throws IOException {
+        Text line = new Text();
+        int num = in.readLine(line);
+        byte[] lineBytes = line.getBytes();
+        if (num <= 0) {
+            return null;
+        }
+        
+        return lineBytes;
+    }
+
+    private boolean isEndOfRow() {
+        if (recordDelimStr == null) {
+            byte[] recordDelimBa = getRecordDelimiter();
+            recordDelimLength = recordDelimBa.length - 1; //Ignore trailing \n
+            recordDelimStr = new String(recordDelimBa, 0, recordDelimLength,  Charsets.UTF_8);
+        }
+        if (recordDelimLength == 0 || currValue.getLength() < recordDelimLength) {
+            return true;
+        }
+        return currValue.find(recordDelimStr, currValue.getLength() - recordDelimLength) >= 0;
+    }
+    
+    protected byte[] getRecordDelimiter() {
+        return DEFAULT_RECORD_DELIM;
+    }
+
     /**
      * Close the <code>OutputHandler</code>.
      * @throws IOException

Added: pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,207 @@
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.PigStreamingBase;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.ToDate;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.WritableByteArray;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.StorageUtil;
+
+import com.google.common.base.Charsets;
+
+public class PigStreamingUDF extends PigStreamingBase {
+    private static final byte PRE_WRAP_DELIM = '|';
+    private static final byte POST_WRAP_DELIM = '_';
+    private static final StreamingDelimiters DELIMS = 
+            new StreamingDelimiters(PRE_WRAP_DELIM, POST_WRAP_DELIM, false);
+    
+    private FieldSchema topLevelFs;
+    private static TupleFactory tupleFactory = TupleFactory.getInstance();
+    private static BagFactory bagFactory = BagFactory.getInstance();
+
+    private WritableByteArray out;
+
+    public PigStreamingUDF() {
+        out = new WritableByteArray();
+    }
+    
+    public PigStreamingUDF(FieldSchema topLevelFs) {
+        out = new WritableByteArray();
+        this.topLevelFs = topLevelFs;
+    }
+
+    @Override
+    public WritableByteArray serializeToBytes(Tuple t) throws IOException {
+        out.reset();
+        int sz;
+        Object field;
+        if (t == null) {
+            sz = 0;
+        } else {
+            sz = t.size();
+        }
+        for (int i=0; i < sz; i++) {
+            field = t.get(i);
+            StorageUtil.putField(out, field, DELIMS, true);
+            if (i != sz-1) {
+                out.write(DELIMS.getParamDelim());
+            }
+        }
+        byte[] recordDel = DELIMS.getRecordEnd();
+        out.write(recordDel, 0, recordDel.length);
+        return out;
+    }
+    
+    @Override
+    public LoadCaster getLoadCaster() throws IOException {
+        return new Utf8StorageConverter();
+    }
+    
+    @Override
+    public Tuple deserialize(byte[] bytes, int offset, int length) throws IOException {
+        Object o = deserialize(topLevelFs, bytes, 0 + offset, length - DELIMS.getRecordEnd().length); //Drop newline
+        return tupleFactory.newTuple(o);
+    }
+    
+    public byte[] getRecordDelim() {
+        return DELIMS.getRecordEnd();
+    }
+
+    private Object deserialize(FieldSchema fs, byte[] bytes, int startIndex, int endIndex) throws IOException {
+        //If null, return null;
+        if (WritableComparator.compareBytes(
+                bytes, startIndex, DELIMS.getNull().length,
+                DELIMS.getNull(), 0, DELIMS.getNull().length) == 0) {
+            return null;
+        }
+
+        if (fs.type == DataType.BAG) {
+            return deserializeBag(fs, bytes, startIndex + 3, endIndex - 2);
+        } else if (fs.type == DataType.TUPLE) {
+            return deserializeTuple(fs, bytes, startIndex + 3, endIndex - 2);
+        } else if (fs.type == DataType.MAP) {
+            return deserializeMap(bytes, startIndex + 3, endIndex - 2);
+        }
+
+        if (fs.type == DataType.CHARARRAY) {
+            return extractString(bytes, startIndex, endIndex, true);
+        } else if (fs.type == DataType.BYTEARRAY) {
+            return new DataByteArray(bytes, startIndex, endIndex+1);
+        }
+
+        //Can we do this faster?
+        String val = extractString(bytes, startIndex, endIndex, false);
+
+        if (fs.type == DataType.LONG) {
+            return Long.valueOf(val);
+        } else if (fs.type == DataType.INTEGER) {
+            return Integer.valueOf(val);
+        } else if (fs.type == DataType.FLOAT) {
+            return Float.valueOf(val);
+        } else if (fs.type == DataType.DOUBLE) {
+            return Double.valueOf(val);
+        } else if (fs.type == DataType.BOOLEAN) {
+            return Boolean.valueOf(val);
+        } else if (fs.type == DataType.DATETIME) {
+           return ToDate.extractDateTime(val);
+        } else if (fs.type == DataType.BIGINTEGER) {
+            return new BigInteger(val);
+        } else if (fs.type == DataType.BIGDECIMAL) {
+            return new BigDecimal(val);
+        } else {
+            throw new ExecException("Can't deserialize type: " + DataType.findTypeName(fs.type));
+        }
+    }
+    
+    private DataBag deserializeBag(FieldSchema fs, byte[] buf, int startIndex, int endIndex) throws IOException {
+        ArrayList<Tuple> protoBag = new ArrayList<Tuple>();
+        int depth = 0;
+        int fieldStart = startIndex;
+
+        for (int index = startIndex; index <= endIndex; index++) {
+            depth = DELIMS.updateDepth(buf, depth, index);
+            if ( StreamingDelimiters.isDelimiter(DELIMS.getFieldDelim(), buf, index, depth, endIndex)) {
+                protoBag.add((Tuple)deserialize(fs.schema.getField(0), buf, fieldStart, index - 1));
+                fieldStart = index + 3;
+            }
+        }
+        return bagFactory.newDefaultBag(protoBag);
+    }
+    
+    private Tuple deserializeTuple(FieldSchema fs, byte[] buf, int startIndex, int endIndex) throws IOException {
+        Schema tupleSchema = fs.schema;
+        
+        ArrayList<Object> protoTuple = new ArrayList<Object>(tupleSchema.size());
+        int depth = 0;
+        int fieldNum = 0;
+        int fieldStart = startIndex;
+        
+
+        for (int index = startIndex; index <= endIndex; index++) {
+            depth = DELIMS.updateDepth(buf, depth, index);
+            if (StreamingDelimiters.isDelimiter(DELIMS.getFieldDelim(), buf, index, depth, endIndex)) {
+                protoTuple.add(deserialize(tupleSchema.getField(fieldNum), buf, fieldStart, index - 1));
+                fieldStart = index + 3;
+                fieldNum++;
+            }
+        }
+        return tupleFactory.newTupleNoCopy(protoTuple);
+    }
+    
+    private Map<String, Object> deserializeMap(byte[] buf, int startIndex, int endIndex) throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        int depth = 0;
+        int fieldStart = startIndex;
+
+        String key = null;
+        Object val = null;
+
+        for (int index = startIndex; index <= endIndex; index++) {
+            byte currChar = buf[index];
+            depth = DELIMS.updateDepth(buf, depth, index);
+
+            if (currChar == DELIMS.getMapKeyDelim() && depth == 0) {
+                key = extractString(buf, fieldStart, index - 1, true);
+                fieldStart = index + 1;
+            }
+
+            if (StreamingDelimiters.isDelimiter(DELIMS.getFieldDelim(), buf, index, depth, endIndex)) {
+                val = extractString(buf, fieldStart, index - 1, true);
+                map.put(key, val);
+                fieldStart = index + 3;
+            }
+        }
+
+        return map;
+    }
+    
+    private String extractString(byte[] bytes, int startIndex, int endIndex, boolean useUtf8) {
+        int length = endIndex - startIndex + 1;
+        if (useUtf8) {
+            return new String(bytes, startIndex, length, Charsets.UTF_8);
+        } else {
+            return new String(bytes, startIndex, length, Charset.defaultCharset());
+        }
+    }
+    
+    
+
+}

Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,156 @@
+package org.apache.pig.impl.streaming;
+
+import org.apache.hadoop.io.WritableComparator;
+
+public class StreamingDelimiters {
+    //RECORD_END must be \n. This assumption is baked into our logic for reading in 
+    //and parsing input.
+    private static final byte RECORD_END = '\n'; 
+    private static final byte PARAM_DELIM = '\t';
+    private static final byte NULL_BYTE = '-';
+    private static final byte TUPLE_BEGIN = '(';
+    private static final byte TUPLE_END = ')';
+    private static final byte BAG_BEGIN = '{';
+    private static final byte BAG_END = '}';
+    private static final byte MAP_BEGIN = '[';
+    private static final byte MAP_END = ']';
+    private static final byte FIELD_DELIM = ',';
+    private static final byte MAP_KEY_VALUE_DELIM = '#'; //Not wrapped by wrapDelimField
+    private byte preWrapDelim;
+    private byte postWrapDelim;
+    
+    private byte[] tupleBegin;
+    private byte[] tupleEnd;
+    private byte[] bagBegin;
+    private byte[] bagEnd;
+    private byte[] mapBegin;
+    private byte[] mapEnd;
+    private byte[] fieldDelim;
+    private byte[] nullByte;
+    private byte[] paramDelim;
+    private byte[] recordEnd;
+
+    public StreamingDelimiters() {
+        this((byte) 0, (byte) 0, true);
+    }
+    
+    /**
+     * 
+     * @param preWrapDelim
+     * @param postWrapDelim
+     * @param useEmptyNull - In the past empty was used to serialize null.  But this can
+     *      make it impossible to differentiate between an empty string and null.  Set
+     *      to false if you want to use a special character to represent null.
+     */
+    public StreamingDelimiters(byte preWrapDelim, byte postWrapDelim, boolean useEmptyNull) {
+        this.preWrapDelim = preWrapDelim;
+        this.postWrapDelim = postWrapDelim;
+        
+        this.tupleBegin = getFullDelim(TUPLE_BEGIN);
+        this.tupleEnd = getFullDelim(TUPLE_END);
+        this.bagBegin = getFullDelim(BAG_BEGIN);
+        this.bagEnd = getFullDelim(BAG_END);
+        this.mapBegin = getFullDelim(MAP_BEGIN);
+        this.mapEnd = getFullDelim(MAP_END);
+        this.fieldDelim = getFullDelim(FIELD_DELIM);
+        
+        if (useEmptyNull) {
+            this.nullByte = new byte[] {};
+        } else {
+            this.nullByte = getFullDelim(NULL_BYTE);
+        }
+        
+        this.paramDelim = getFullDelim(PARAM_DELIM);
+        //recordEnd has to end with the RECORD_END byte
+        this.recordEnd = new byte[] {preWrapDelim, postWrapDelim, RECORD_END};
+    }
+    
+    private byte[] getFullDelim(byte val) {
+        if (preWrapDelim == 0)
+            return new byte[] {val};
+        else
+            return new byte[] {preWrapDelim, val, postWrapDelim};
+    }
+    
+    public byte[] getTupleBegin() {
+        return tupleBegin;
+    }
+    
+    public byte[] getTupleEnd() {
+        return tupleEnd;
+    }
+    
+    public byte[] getBagBegin() {
+        return bagBegin;
+    }
+    
+    public byte[] getBagEnd() {
+        return bagEnd;
+    }
+    
+    public byte[] getMapBegin() {
+        return mapBegin;
+    }
+    
+    public byte[] getMapEnd() {
+        return mapEnd;
+    }
+    
+    public byte[] getFieldDelim() {
+        return fieldDelim;
+    }
+    
+    public byte getMapKeyDelim() {
+        return MAP_KEY_VALUE_DELIM;
+    }
+    
+    public byte[] getNull() {
+        return nullByte;
+    }
+    
+    public byte[] getParamDelim() {
+        return paramDelim;
+    }
+    
+    public byte[] getRecordEnd() {
+        return recordEnd;
+    }
+
+    /**
+     * @return - The new depth.  Depth is increased if at the end of a byte sequence
+     * that indicates the start of a bag, tuple, or map.  Depth is decreased if at the 
+     * end of a byte sequence that indicates the end of a bug, tuple, or map.
+     */
+    public int updateDepth(byte[] buf, int currDepth, int index) {
+        if (index < 2 || preWrapDelim == 0 || buf[index-2] != preWrapDelim || buf[index] != postWrapDelim) {
+            return currDepth;
+        }
+        
+        byte delimChar = preWrapDelim == 0 ? buf[index] : buf[index-1];
+        if (delimChar == BAG_BEGIN || delimChar == TUPLE_BEGIN || delimChar == MAP_BEGIN) {
+            return currDepth + 1;
+        } else if (delimChar == BAG_END || delimChar == TUPLE_END || delimChar == MAP_END) {
+            return currDepth - 1;
+        } else {
+            return currDepth;
+        }
+    }
+    
+    /**
+     * 
+     * @param delimiter
+     * @param buf
+     * @param index
+     * @param depth
+     * @param endIndex
+     * @return - True iff the delimiter
+     */
+    public static boolean isDelimiter(byte[] delimiter, byte[] buf, int index, int depth, int endIndex) {
+        return (depth == 0 && ( index == endIndex ||
+                                ( index <= endIndex - 2 &&
+                                  WritableComparator.compareBytes(
+                                          buf, index, delimiter.length, 
+                                          delimiter, 0, delimiter.length) == 0)));
+    }
+
+ }
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,61 @@
+package org.apache.pig.impl.streaming;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+public class StreamingUDFException extends ExecException {
+    static final long serialVersionUID = 1;
+    private String message;
+    private String language;
+    private Integer lineNumber;
+
+    public StreamingUDFException() {
+    }
+
+    public StreamingUDFException(String message) {
+        this.message = message;
+    }
+
+    public StreamingUDFException(String message, Integer lineNumber) {
+        this.message = message;
+        this.lineNumber = lineNumber;
+    }
+
+    public StreamingUDFException(String language, String message, Throwable cause) {
+        super(cause);
+        this.language = language;
+        this.message = message + "\n" + cause.getMessage() + "\n";
+    }
+
+    public StreamingUDFException(String language, String message) {
+        this(language, message, (Integer) null);
+    }
+
+    public StreamingUDFException(String language, String message, Integer lineNumber) {
+        this.language = language;
+        this.message = message;
+        this.lineNumber = lineNumber;
+    }
+
+    public String getLanguage() {
+        return language;
+    }
+
+    public Integer getLineNumber() {
+        return lineNumber;
+    }
+
+    @Override
+    public String getMessage() {
+        return this.message;
+    }
+    
+    @Override
+    public String toString() {
+        //Don't modify this without also modifying Launcher.getExceptionFromStrings!
+        String s = getClass().getName();
+        String message = getMessage();
+        String lineNumber = this.getLineNumber() == null ? "" : "" + this.getLineNumber();
+        return (message != null) ? (s + ": " + "LINE " + lineNumber + ": " + message) : s;
+
+    }
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,10 @@
+package org.apache.pig.impl.streaming;
+
+import org.apache.pig.PigStreamingBase;
+
+public class StreamingUDFInputHandler extends DefaultInputHandler {
+    
+    public StreamingUDFInputHandler(PigStreamingBase serializer) {
+        this.serializer = serializer;
+    }
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,14 @@
+package org.apache.pig.impl.streaming;
+
+
+public class StreamingUDFOutputHandler extends DefaultOutputHandler { 
+    
+    public StreamingUDFOutputHandler(PigStreamingUDF deserializer) {
+        this.deserializer = deserializer;
+    }
+    
+    @Override
+    protected byte[] getRecordDelimiter() {
+        return ( ((PigStreamingUDF)deserializer).getRecordDelim() );
+    }
+}

Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,16 @@
+package org.apache.pig.impl.streaming;
+
+public class StreamingUDFOutputSchemaException extends Exception {
+    private static final long serialVersionUID = 1L;
+    
+    private int lineNumber;
+
+    public StreamingUDFOutputSchemaException(String message, int lineNumber) {
+        super(message);
+        this.lineNumber = lineNumber;
+    }
+    
+    public int getLineNumber() {
+        return lineNumber;
+    }
+}

Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,109 @@
+package org.apache.pig.impl.streaming;
+
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.impl.util.UDFContext;
+
+public class StreamingUtil {
+    private static Log LOG = LogFactory.getLog(StreamingUtil.class);
+    
+    private static final String BASH = "bash";
+    private static final String PATH = "PATH";
+    
+    /**
+     * Create an external process for StreamingCommand command.
+     * 
+     * @param command
+     * @return
+     */
+    public static ProcessBuilder createProcess(StreamingCommand command) {
+        // Set the actual command to run with 'bash -c exec ...'
+        List<String> cmdArgs = new ArrayList<String>();
+        String[] argv = command.getCommandArgs();
+
+        StringBuffer argBuffer = new StringBuffer();
+        for (String arg : argv) {
+            argBuffer.append(arg);
+            argBuffer.append(" ");
+        }
+        String argvAsString = argBuffer.toString();
+        
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+          cmdArgs.add("cmd");
+          cmdArgs.add("/c");
+          cmdArgs.add(argvAsString);
+        } else {
+          cmdArgs.add(BASH);
+          cmdArgs.add("-c");
+          StringBuffer sb = new StringBuffer();
+          sb.append("exec ");
+          sb.append(argvAsString);
+          cmdArgs.add(sb.toString());
+        }
+
+        // Start the external process
+        ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs
+                .toArray(new String[cmdArgs.size()]));
+        setupEnvironment(processBuilder);
+        return processBuilder;
+    }
+    
+    /**
+     * Set up the run-time environment of the managed process.
+     * 
+     * @param pb
+     *            {@link ProcessBuilder} used to exec the process
+     */
+    private static void setupEnvironment(ProcessBuilder pb) {
+        String separator = ":";
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
+        Map<String, String> env = pb.environment();
+        addJobConfToEnvironment(conf, env);
+
+        // Add the current-working-directory to the $PATH
+        File dir = pb.directory();
+        String cwd = (dir != null) ? dir.getAbsolutePath() : System
+                .getProperty("user.dir");
+
+        String envPath = env.get(PATH);
+        if (envPath == null) {
+            envPath = cwd;
+        } else {
+            envPath = envPath + separator + cwd;
+        }
+        env.put(PATH, envPath);
+    }
+    
+    protected static void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
+        String propsToSend = conf.get(PIG_STREAMING_ENVIRONMENT);
+        LOG.debug("Properties to ship to streaming environment set in "+PIG_STREAMING_ENVIRONMENT+": " + propsToSend);
+        if (propsToSend == null) {
+            return;
+        }
+
+        for (String prop : propsToSend.split(",")) {
+            String value = conf.get(prop);
+            if (value == null) {
+                LOG.warn("Property set in "+PIG_STREAMING_ENVIRONMENT+" not found in Configuration: " + prop);
+                continue;
+            }
+            LOG.debug("Setting property in streaming environment: " + prop);
+            envPut(env, prop, value);
+        }
+    }
+    
+    private static void envPut(Map<String, String> env, String name, String value) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Add  env entry:" + name + "=" + value);
+        }
+        env.put(name, value);
+    }
+}

Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Mon Sep 23 15:56:16 2013
@@ -42,8 +42,10 @@ import java.util.zip.ZipEntry;
 import org.antlr.runtime.CommonTokenStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.StreamingUDF;
 import org.apache.tools.bzip2r.BZip2Constants;
 import org.codehaus.jackson.annotate.JsonPropertyOrder;
 import org.codehaus.jackson.map.annotate.JacksonStdImpl;
@@ -132,7 +134,10 @@ public class JarManager {
      */
     @SuppressWarnings("deprecation")
     public static void createJar(OutputStream os, Set<String> funcs, PigContext pigContext) throws ClassNotFoundException, IOException {
+        JarOutputStream jarFile = new JarOutputStream(os);
+        HashMap<String, String> contents = new HashMap<String, String>();
         Vector<JarListEntry> jarList = new Vector<JarListEntry>();
+
         for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
             addContainingJar(jarList, pkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
         }
@@ -141,10 +146,16 @@ public class JarManager {
             Class clazz = pigContext.getClassForAlias(func);
             if (clazz != null) {
                 addContainingJar(jarList, clazz, null, pigContext);
+                
+                if (clazz.getSimpleName().equals("StreamingUDF")) {
+                    for (String fileName : StreamingUDF.getResourcesForJar()) {
+                        InputStream in = Launcher.class.getResourceAsStream(fileName);
+                        addStream(jarFile, fileName, in, contents);
+                    }
+                }
             }
         }
-        HashMap<String, String> contents = new HashMap<String, String>();
-        JarOutputStream jarFile = new JarOutputStream(os);
+
         Iterator<JarListEntry> it = jarList.iterator();
         while (it.hasNext()) {
             JarListEntry jarEntry = it.next();

Modified: pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java Mon Sep 23 15:56:16 2013
@@ -21,12 +21,12 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.joda.time.DateTime;
-
 import org.apache.hadoop.io.Text;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +37,10 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.streaming.StreamingDelimiters;
+import org.joda.time.DateTime;
+
+import com.google.common.base.Charsets;
 
 /**
  * This util class provides methods that are shared by storage class
@@ -44,9 +48,23 @@ import org.apache.pig.data.TupleFactory;
  *
  */
 public final class StorageUtil {
-
-    private static final String UTF8 = "UTF-8";
-
+    private static Map<Byte, byte[]> TYPE_INDICATOR;
+    static {
+        TYPE_INDICATOR = new HashMap<Byte, byte[]>();
+        TYPE_INDICATOR.put(DataType.BOOLEAN, new byte[] {'B'});
+        TYPE_INDICATOR.put(DataType.INTEGER, new byte[] {'I'});
+        TYPE_INDICATOR.put(DataType.LONG, new byte[] {'L'});
+        TYPE_INDICATOR.put(DataType.FLOAT, new byte[] {'F'});
+        TYPE_INDICATOR.put(DataType.DOUBLE, new byte[] {'D'});
+        TYPE_INDICATOR.put(DataType.BYTEARRAY, new byte[] {'A'});
+        TYPE_INDICATOR.put(DataType.CHARARRAY, new byte[] {'C'});
+        TYPE_INDICATOR.put(DataType.DATETIME, new byte[] {'T'});
+        TYPE_INDICATOR.put(DataType.BIGINTEGER, new byte[] {'N'});
+        TYPE_INDICATOR.put(DataType.BIGDECIMAL, new byte[] {'E'});
+    }
+    
+    public static final StreamingDelimiters DEFAULT_DELIMITERS = new StreamingDelimiters();
+    
     /**
      * Transform a <code>String</code> into a byte representing the
      * field delimiter.
@@ -94,6 +112,14 @@ public final class StorageUtil {
         return fieldDel;
     }
 
+    public static void putField(OutputStream out, Object field) throws IOException {
+        putField(out, field, DEFAULT_DELIMITERS, false);
+    }
+    
+    public static void putField(OutputStream out, Object field, boolean includeTypeInformation) throws IOException {
+        putField(out, field, DEFAULT_DELIMITERS, includeTypeInformation);
+    }
+    
     /**
      * Serialize an object to an {@link OutputStream} in the
      * field-delimited form.
@@ -103,113 +129,112 @@ public final class StorageUtil {
      * @throws IOException if serialization fails.
      */
     @SuppressWarnings("unchecked")
-    public static void putField(OutputStream out, Object field)
+    public static void putField(OutputStream out, Object field, StreamingDelimiters delims, boolean includeTypeInformation)
     throws IOException {
-        //string constants for each delimiter
-        String tupleBeginDelim = "(";
-        String tupleEndDelim = ")";
-        String bagBeginDelim = "{";
-        String bagEndDelim = "}";
-        String mapBeginDelim = "[";
-        String mapEndDelim = "]";
-        String fieldDelim = ",";
-        String mapKeyValueDelim = "#";
-
         switch (DataType.findType(field)) {
         case DataType.NULL:
-            break; // just leave it empty
+            out.write(delims.getNull());
+            break;
 
         case DataType.BOOLEAN:
-            out.write(((Boolean)field).toString().getBytes());
+            writeField(out, ((Boolean)field).toString().getBytes(Charset.defaultCharset()), 
+                    DataType.BOOLEAN, includeTypeInformation);
             break;
 
         case DataType.INTEGER:
-            out.write(((Integer)field).toString().getBytes());
+            writeField(out, ((Integer)field).toString().getBytes(Charset.defaultCharset()), 
+                    DataType.INTEGER, includeTypeInformation);
             break;
 
         case DataType.LONG:
-            out.write(((Long)field).toString().getBytes());
+            writeField(out, ((Long)field).toString().getBytes(Charset.defaultCharset()), 
+                    DataType.LONG, includeTypeInformation);
             break;
 
         case DataType.FLOAT:
-            out.write(((Float)field).toString().getBytes());
+            writeField(out, ((Float)field).toString().getBytes(Charset.defaultCharset()), 
+                    DataType.FLOAT, includeTypeInformation);
             break;
 
         case DataType.DOUBLE:
-            out.write(((Double)field).toString().getBytes());
+            writeField(out, ((Double)field).toString().getBytes(Charset.defaultCharset()), 
+                    DataType.DOUBLE, includeTypeInformation);
             break;
 
         case DataType.BIGINTEGER:
-            out.write(((BigInteger)field).toString().getBytes());
+            writeField(out, ((BigInteger)field).toString().getBytes(Charset.defaultCharset()),
+                    DataType.BIGINTEGER, includeTypeInformation);
             break;
 
         case DataType.BIGDECIMAL:
-            out.write(((BigDecimal)field).toString().getBytes());
+            writeField(out, ((BigDecimal)field).toString().getBytes(Charset.defaultCharset()),
+                    DataType.BIGDECIMAL, includeTypeInformation);
             break;
 
         case DataType.DATETIME:
-            out.write(((DateTime)field).toString().getBytes());
+            writeField(out, ((DateTime)field).toString().getBytes(Charset.defaultCharset()),
+                    DataType.DATETIME, includeTypeInformation);
             break;
 
         case DataType.BYTEARRAY:
-            byte[] b = ((DataByteArray)field).get();
-            out.write(b, 0, b.length);
+            writeField(out, ((DataByteArray)field).get(), 
+                    DataType.BYTEARRAY, includeTypeInformation);
             break;
 
         case DataType.CHARARRAY:
-            // oddly enough, writeBytes writes a string
-            out.write(((String)field).getBytes(UTF8));
+            writeField(out, ((String)field).getBytes(Charsets.UTF_8), 
+                    DataType.CHARARRAY, includeTypeInformation);
             break;
 
         case DataType.MAP:
             boolean mapHasNext = false;
             Map<String, Object> m = (Map<String, Object>)field;
-            out.write(mapBeginDelim.getBytes(UTF8));
+            out.write(delims.getMapBegin());
             for(Map.Entry<String, Object> e: m.entrySet()) {
                 if(mapHasNext) {
-                    out.write(fieldDelim.getBytes(UTF8));
+                    out.write(delims.getFieldDelim());
                 } else {
                     mapHasNext = true;
                 }
-                putField(out, e.getKey());
-                out.write(mapKeyValueDelim.getBytes(UTF8));
-                putField(out, e.getValue());
+                putField(out, e.getKey(), delims, includeTypeInformation);
+                out.write(delims.getMapKeyDelim());
+                putField(out, e.getValue(), delims, includeTypeInformation);
             }
-            out.write(mapEndDelim.getBytes(UTF8));
+            out.write(delims.getMapEnd());
             break;
 
         case DataType.TUPLE:
             boolean tupleHasNext = false;
             Tuple t = (Tuple)field;
-            out.write(tupleBeginDelim.getBytes(UTF8));
+            out.write(delims.getTupleBegin());
             for(int i = 0; i < t.size(); ++i) {
                 if(tupleHasNext) {
-                    out.write(fieldDelim.getBytes(UTF8));
+                    out.write(delims.getFieldDelim());
                 } else {
                     tupleHasNext = true;
                 }
                 try {
-                    putField(out, t.get(i));
+                    putField(out, t.get(i), delims, includeTypeInformation);
                 } catch (ExecException ee) {
                     throw ee;
                 }
             }
-            out.write(tupleEndDelim.getBytes(UTF8));
+            out.write(delims.getTupleEnd());
             break;
 
         case DataType.BAG:
             boolean bagHasNext = false;
-            out.write(bagBeginDelim.getBytes(UTF8));
+            out.write(delims.getBagBegin());
             Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
             while(tupleIter.hasNext()) {
                 if(bagHasNext) {
-                    out.write(fieldDelim.getBytes(UTF8));
+                    out.write(delims.getFieldDelim());
                 } else {
                     bagHasNext = true;
                 }
-                putField(out, (Object)tupleIter.next());
+                putField(out, (Object)tupleIter.next(), delims, includeTypeInformation);
             }
-            out.write(bagEndDelim.getBytes(UTF8));
+            out.write(delims.getBagEnd());
             break;
 
         default: {
@@ -220,6 +245,14 @@ public final class StorageUtil {
 
         }
     }
+    
+    private static void writeField(OutputStream out, byte[] bytes, byte dataType, 
+            boolean includeTypeInformation) throws IOException {
+        if (includeTypeInformation) {
+            out.write(TYPE_INDICATOR.get(dataType));
+        }
+        out.write(bytes);
+    }
 
     /**
      * Transform a line of <code>Text</code> to a <code>Tuple</code>
@@ -260,7 +293,7 @@ public final class StorageUtil {
 
         return TupleFactory.getInstance().newTupleNoCopy(protoTuple);
     }
-
+    
     //---------------------------------------------------------------
     // private methods
 

Modified: pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java Mon Sep 23 15:56:16 2013
@@ -50,7 +50,9 @@ public abstract class ScriptEngine {
         jruby(new String[]{"ruby", "jruby"}, new String[]{"rb"}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
         jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"),
         javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine"),
-        groovy(new String[]{}, new String[]{"groovy"}, "org.apache.pig.scripting.groovy.GroovyScriptEngine");
+        groovy(new String[]{}, new String[]{"groovy"}, "org.apache.pig.scripting.groovy.GroovyScriptEngine"),
+        streaming_python(new String[]{"streaming_python"}, new String[]{}, "org.apache.pig.scripting.streaming.python.PythonScriptEngine");
+
 
         private static Set<String> supportedScriptLangs;
         static {

Added: pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,127 @@
+package org.apache.pig.scripting;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.ExecType;
+import org.apache.pig.impl.util.UDFContext;
+
+import com.google.common.base.Charsets;
+
+/**
+ * This class helps a scripting UDF capture user output by managing when to capture output and where
+ * the output is written to.
+ * 
+ * For illustrate, we will only capture output for the last run (with the final set of data) and
+ *   we need to keep track of the file containing that output for returning w/ the illustrate results.
+ * For runs, all standard output is written to the user logs.
+ */
+public class ScriptingOutputCapturer {
+    private static Log log = LogFactory.getLog(ScriptingOutputCapturer.class);
+    
+    private static Map<String, String> outputFileNames = new HashMap<String, String>();
+    private static String runId = UUID.randomUUID().toString(); //Unique ID for this run to ensure udf output files aren't corrupted from previous runs.
+
+    //Illustrate will set the static flag telling udf to start capturing its output.  It's up to each
+    //instance to react to it and set its own flag.
+    private static boolean captureOutput = false;
+    private boolean instancedCapturingOutput = false;
+
+    private ExecType execType;
+
+    public ScriptingOutputCapturer(ExecType execType) {
+        this.execType = execType;
+    }
+
+    public String getStandardOutputRootWriteLocation() {
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
+        
+        String jobId = conf.get("mapred.job.id");
+        String taskId = conf.get("mapred.task.id");
+        log.debug("JobId: " + jobId);
+        log.debug("TaskId: " + taskId);
+
+        if (execType.isLocal()) {
+            String logDir = System.getProperty("pig.udf.scripting.log.dir");
+            if (logDir == null)
+                logDir = ".";
+            return logDir + "/" + (taskId == null ? "" : (taskId + "_"));
+        } else {
+            String taskLogDir = getTaskLogDir(jobId, taskId);
+            return taskLogDir + "/";
+        }
+    }
+
+    public String getTaskLogDir(String jobId, String taskId) {
+        String taskLogDir;
+        String hadoopLogDir = System.getProperty("hadoop.log.dir");
+        String defaultUserLogDir = hadoopLogDir + File.separator + "userlogs";
+
+        if ( new File(defaultUserLogDir + File.separator + jobId).exists() ) {
+            taskLogDir = defaultUserLogDir + File.separator + jobId + File.separator + taskId;
+        } else if ( new File(defaultUserLogDir + File.separator + taskId).exists() ) {
+            taskLogDir = defaultUserLogDir + File.separator + taskId;
+        } else if ( new File(defaultUserLogDir).exists() ){
+            taskLogDir = defaultUserLogDir;
+        } else {
+            taskLogDir = hadoopLogDir + File.separator + "udfOutput";
+        }
+        return taskLogDir;
+    }
+    
+    public static void startCapturingOutput() {
+       ScriptingOutputCapturer.captureOutput = true;
+    }
+    
+    public static Map<String, String> getUdfOutput() throws IOException {
+        Map<String, String> udfFuncNameToOutput = new HashMap<String,String>();
+        for (Map.Entry<String, String> funcToOutputFileName : outputFileNames.entrySet()) {
+            StringBuffer udfOutput = new StringBuffer();
+            FileInputStream fis = new FileInputStream(funcToOutputFileName.getValue());
+            Reader fr = new InputStreamReader(fis, Charsets.UTF_8);
+            BufferedReader br = new BufferedReader(fr);
+            
+            try {
+                String line = br.readLine();
+                while (line != null) {
+                    udfOutput.append("\t" + line + "\n");
+                    line = br.readLine();
+                }
+            } finally {
+                br.close();
+            }
+            udfFuncNameToOutput.put(funcToOutputFileName.getKey(), udfOutput.toString());
+        }
+        return udfFuncNameToOutput;
+    }
+    
+    public void registerOutputLocation(String functionName, String fileName) {
+        outputFileNames.put(functionName, fileName);
+    }
+    
+    public static String getRunId() {
+        return runId;
+    }
+    
+    public static boolean isClassCapturingOutput() {
+        return ScriptingOutputCapturer.captureOutput;
+    }
+    
+    public boolean isInstanceCapturingOutput() {
+        return this.instancedCapturingOutput;
+    }
+    
+    public void setInstanceCapturingOutput(boolean instanceCapturingOutput) {
+        this.instancedCapturingOutput = instanceCapturingOutput;
+    }
+}

Added: pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,111 @@
+package org.apache.pig.scripting.streaming.python;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.scripting.ScriptEngine;
+import org.apache.pig.tools.pigstats.PigStats;
+
+public class PythonScriptEngine extends ScriptEngine {
+    private static final Log log = LogFactory.getLog(PythonScriptEngine.class);
+
+    @Override
+    public void registerFunctions(String path, String namespace,
+            PigContext pigContext) throws IOException {
+        
+        String fileName = path.substring(0, path.length() - ".py".length());
+        log.debug("Path: " + path + " FileName: " + fileName + " Namespace: " + namespace);
+        File f = new File(path);
+
+        if (!f.canRead()) {
+            throw new IOException("Can't read file: " + path);
+        }
+        
+        FileInputStream fin = new FileInputStream(f);
+        List<String[]> functions = getFunctions(fin);
+        namespace = namespace == null ? "" : namespace + NAMESPACE_SEPARATOR;
+        for(String[] functionInfo : functions) {
+            String name = functionInfo[0];
+            String schemaString = functionInfo[1];
+            String schemaLineNumber = functionInfo[2];
+            String alias = namespace + name;
+            String execType = (pigContext.getExecType() == ExecType.LOCAL? "local" : "mapreduce");
+            String isIllustrate = (Boolean.valueOf(pigContext.inIllustrator)).toString();
+            log.debug("Registering Function: " + alias);
+            pigContext.registerFunction(alias, 
+                                        new FuncSpec("StreamingUDF", 
+                                                new String[] {
+                                                    "python", 
+                                                    fileName, name, 
+                                                    schemaString, schemaLineNumber,
+                                                    execType, isIllustrate
+                                        }));
+        }
+        fin.close();
+    }
+
+    @Override
+    protected Map<String, List<PigStats>> main(PigContext context,
+            String scriptFile) throws IOException {
+        log.warn("ScriptFile: " + scriptFile);
+        registerFunctions(scriptFile, null, context);
+        return getPigStatsMap();
+    }
+
+    @Override
+    protected String getScriptingLang() {
+        return "streaming_python";
+    }
+
+    @Override
+    protected Map<String, Object> getParamsFromVariables() throws IOException {
+        throw new IOException("Unsupported Operation");
+    }
+    
+    private static final Pattern pSchema = Pattern.compile("^\\s*\\W+outputSchema.*");
+    private static final Pattern pDef = Pattern.compile("^\\s*def\\s+(\\w+)\\s*.+");
+
+    private static List<String[]> getFunctions(InputStream is) throws IOException {
+        List<String[]> functions = new ArrayList<String[]>();
+        InputStreamReader in = new InputStreamReader(is, Charset.defaultCharset());
+        BufferedReader br = new BufferedReader(in);
+        String line = br.readLine();
+        String schemaString = null;
+        String schemaLineNumber = null;
+        int lineNumber = 1;
+        while (line != null) {
+            if (pSchema.matcher(line).matches()) {
+                int start = line.indexOf("(") + 2; //drop brackets/quotes
+                int end = line.lastIndexOf(")") - 1;
+                schemaString = line.substring(start,end).trim();
+                schemaLineNumber = "" + lineNumber;
+            } else if (pDef.matcher(line).matches()) {
+                int start = line.indexOf("def ") + "def ".length();
+                int end = line.indexOf('(');
+                String functionName = line.substring(start, end).trim();
+                if (schemaString != null) {
+                    String[] funcInfo = {functionName, schemaString, "" + schemaLineNumber};
+                    functions.add(funcInfo);
+                    schemaString = null;
+                }
+            }
+            line = br.readLine();
+            lineNumber++;
+        }
+        return functions;
+    }
+}