You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/09/23 17:56:17 UTC
svn commit: r1525632 [1/3] - in /pig/trunk: ./
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/streaming/
src/org/apache/pig/impl/util/ src/org/apache/pig/scripting/
src/org/apache/pig/scripting/streaming/
src/org/apache/pig/scripting/streamin...
Author: daijy
Date: Mon Sep 23 15:56:16 2013
New Revision: 1525632
URL: http://svn.apache.org/r1525632
Log:
PIG-2417: Streaming UDFs - allow users to easily write UDFs in scripting languages with no JVM implementation
Added:
pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java
pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java
pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java
pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java
pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java
pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java
pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java
pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java
pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java
pig/trunk/src/org/apache/pig/scripting/streaming/
pig/trunk/src/org/apache/pig/scripting/streaming/python/
pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java
pig/trunk/src/python/
pig/trunk/src/python/streaming/
pig/trunk/src/python/streaming/controller.py
pig/trunk/src/python/streaming/pig_util.py
pig/trunk/test/e2e/pig/udfs/cpython/
pig/trunk/test/e2e/pig/udfs/cpython/morepythonudfs.py
pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py
pig/trunk/test/org/apache/pig/impl/builtin/
pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
pig/trunk/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUtil.java
pig/trunk/test/org/apache/pig/test/TestPigStreaming.java
pig/trunk/test/python/
pig/trunk/test/python/streaming/
pig/trunk/test/python/streaming/test_controller.py
Removed:
pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/build.xml
pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
pig/trunk/src/org/apache/pig/impl/util/JarManager.java
pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java
pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
pig/trunk/test/e2e/pig/build.xml
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/excluded-tests-23
pig/trunk/test/unit-tests
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 23 15:56:16 2013
@@ -30,6 +30,9 @@ PIG-3174: Remove rpm and deb artifacts f
IMPROVEMENTS
+PIG-2417: Streaming UDFs - allow users to easily write UDFs in scripting languages with no
+ JVM implementation. (jeremykarn via daijy)
+
PIG-3199: Provide a method to retriever name of loader/storer in PigServer (prkommireddi via daijy)
PIG-3367: Add assert keyword (operator) in pig (aniket486)
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Mon Sep 23 15:56:16 2013
@@ -40,6 +40,7 @@
<!-- source properties -->
<property name="lib.dir" value="${basedir}/lib" />
<property name="src.dir" value="${basedir}/src" />
+ <property name="python.src.dir" value="${src.dir}/python" />
<property name="src.lib.dir" value="${basedir}/lib-src" />
<property name="src.gen.dir" value="${basedir}/src-gen" />
<property name="docs.dir" value="${basedir}/src/docs" />
@@ -583,6 +584,9 @@
</javac>
<copy file="${src.dir}/org/apache/pig/tools/grunt/autocomplete" todir="${build.classes}/org/apache/pig/tools/grunt"/>
<copy file="${src.dir}/org/apache/pig/tools/grunt/autocomplete_aliases" todir="${build.classes}/org/apache/pig/tools/grunt"/>
+ <copy todir="${build.classes}/python">
+ <fileset dir="${python.src.dir}"/>
+ </copy>
</sequential>
</macrodef>
@@ -651,6 +655,7 @@
</manifest>
<fileset dir="${src.lib.dir}/shock"/>
<fileset dir="${src.lib.dir}/bzip2"/>
+ <fileset dir="${python.src.dir}"/>
</jar>
</target>
<target name="javadoc-jar" depends="cc-compile, javadoc">
Added: pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java (added)
+++ pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,463 @@
+package org.apache.pig.impl.builtin;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.ExecTypeProvider;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.streaming.InputHandler;
+import org.apache.pig.impl.streaming.OutputHandler;
+import org.apache.pig.impl.streaming.PigStreamingUDF;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingUDFException;
+import org.apache.pig.impl.streaming.StreamingUDFInputHandler;
+import org.apache.pig.impl.streaming.StreamingUDFOutputHandler;
+import org.apache.pig.impl.streaming.StreamingUDFOutputSchemaException;
+import org.apache.pig.impl.streaming.StreamingUtil;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+import org.apache.pig.scripting.ScriptingOutputCapturer;
+
+import com.google.common.base.Charsets;
+
+public class StreamingUDF extends EvalFunc<Object> {
+ private static final Log log = LogFactory.getLog(StreamingUDF.class);
+
+ private static final String PYTHON_CONTROLLER_JAR_PATH = "/python/streaming/controller.py"; //Relative to root of pig jar.
+ private static final String PYTHON_PIG_UTIL_PATH = "/python/streaming/pig_util.py"; //Relative to root of pig jar.
+
+ //Indexes for arguments being passed to external process
+ private static final int UDF_LANGUAGE = 0;
+ private static final int PATH_TO_CONTROLLER_FILE = 1;
+ private static final int UDF_FILE_NAME = 2; //Name of file where UDF function is defined
+ private static final int UDF_FILE_PATH = 3; //Path to directory containing file where UDF function is defined
+ private static final int UDF_NAME = 4; //Name of UDF function being called.
+ private static final int PATH_TO_FILE_CACHE = 5; //Directory where required files (like pig_util) are cached on cluster nodes.
+ private static final int STD_OUT_OUTPUT_PATH = 6; //File for output from when user writes to standard output.
+ private static final int STD_ERR_OUTPUT_PATH = 7; //File for output from when user writes to standard error.
+ private static final int CONTROLLER_LOG_FILE_PATH = 8; //Controller log file logs progress through the controller script not user code.
+ private static final int IS_ILLUSTRATE = 9; //Controller captures output differently in illustrate vs running.
+
+ private String language;
+ private String filePath;
+ private String funcName;
+ private Schema schema;
+ private ExecType execType;
+ private String isIllustrate;
+
+ private boolean initialized = false;
+ private ScriptingOutputCapturer soc;
+
+ private Process process; // Handle to the external process
+ private ProcessErrorThread stderrThread; // thread to get process stderr
+ private ProcessInputThread stdinThread; // thread to send input to process
+ private ProcessOutputThread stdoutThread; //thread to read output from process
+
+ private InputHandler inputHandler;
+ private OutputHandler outputHandler;
+
+ private BlockingQueue<Tuple> inputQueue;
+ private BlockingQueue<Object> outputQueue;
+
+ private DataOutputStream stdin; // stdin of the process
+ private InputStream stdout; // stdout of the process
+ private InputStream stderr; // stderr of the process
+
+ private static final Object ERROR_OUTPUT = new Object();
+ private static final Object NULL_OBJECT = new Object(); //BlockingQueue can't have null. Use place holder object instead.
+
+ private volatile StreamingUDFException outerrThreadsError;
+
+ public static final String TURN_ON_OUTPUT_CAPTURING = "TURN_ON_OUTPUT_CAPTURING";
+
+ public StreamingUDF(String language,
+ String filePath, String funcName,
+ String outputSchemaString, String schemaLineNumber,
+ String execType, String isIllustrate)
+ throws StreamingUDFOutputSchemaException, ExecException {
+ this.language = language;
+ this.filePath = filePath;
+ this.funcName = funcName;
+ try {
+ this.schema = Utils.getSchemaFromString(outputSchemaString);
+ //ExecTypeProvider.fromString doesn't seem to load the ExecTypes in
+ //mapreduce mode so we'll try to figure out the exec type ourselves.
+ if (execType.equals("local")) {
+ this.execType = ExecType.LOCAL;
+ } else if (execType.equals("mapreduce")) {
+ this.execType = ExecType.MAPREDUCE;
+ } else {
+ //Not sure what exec type - try to get it from the string.
+ this.execType = ExecTypeProvider.fromString(execType);
+ }
+ } catch (ParserException pe) {
+ throw new StreamingUDFOutputSchemaException(pe.getMessage(), Integer.valueOf(schemaLineNumber));
+ } catch (IOException ioe) {
+ String errorMessage = "Invalid exectype passed to StreamingUDF. Should be local or mapreduce";
+ log.error(errorMessage, ioe);
+ throw new ExecException(errorMessage, ioe);
+ }
+ this.isIllustrate = isIllustrate;
+ }
+
+ @Override
+ public Object exec(Tuple input) throws IOException {
+ if (!initialized) {
+ initialize();
+ initialized = true;
+ }
+ return getOutput(input);
+ }
+
+ private void initialize() throws ExecException, IOException {
+ inputQueue = new ArrayBlockingQueue<Tuple>(1);
+ outputQueue = new ArrayBlockingQueue<Object>(2);
+ soc = new ScriptingOutputCapturer(execType);
+ startUdfController();
+ createInputHandlers();
+ setStreams();
+ startThreads();
+ }
+
+ private StreamingCommand startUdfController() throws IOException {
+ StreamingCommand sc = new StreamingCommand(null, constructCommand());
+ ProcessBuilder processBuilder = StreamingUtil.createProcess(sc);
+ process = processBuilder.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new ProcessKiller() ) );
+ return sc;
+ }
+
+ private String[] constructCommand() throws IOException {
+ String[] command = new String[10];
+ Configuration conf = UDFContext.getUDFContext().getJobConf();
+
+ String jarPath = conf.get("mapred.jar");
+ String jobDir;
+ if (jarPath != null) {
+ jobDir = new File(jarPath).getParent();
+ } else {
+ jobDir = "";
+ }
+
+ String standardOutputRootWriteLocation = soc.getStandardOutputRootWriteLocation();
+ String controllerLogFileName, outFileName, errOutFileName;
+
+ if (execType.isLocal()) {
+ controllerLogFileName = standardOutputRootWriteLocation + funcName + "_python.log";
+ outFileName = standardOutputRootWriteLocation + "cpython_" + funcName + "_" + ScriptingOutputCapturer.getRunId() + ".out";
+ errOutFileName = standardOutputRootWriteLocation + "cpython_" + funcName + "_" + ScriptingOutputCapturer.getRunId() + ".err";
+ } else {
+ controllerLogFileName = standardOutputRootWriteLocation + funcName + "_python.log";
+ outFileName = standardOutputRootWriteLocation + funcName + ".out";
+ errOutFileName = standardOutputRootWriteLocation + funcName + ".err";
+ }
+
+ soc.registerOutputLocation(funcName, outFileName);
+
+ command[UDF_LANGUAGE] = language;
+ command[PATH_TO_CONTROLLER_FILE] = getControllerPath(jobDir);
+ int lastSeparator = filePath.lastIndexOf(File.separator) + 1;
+ command[UDF_FILE_NAME] = filePath.substring(lastSeparator);
+ command[UDF_FILE_PATH] = lastSeparator <= 0 ?
+ "." :
+ filePath.substring(0, lastSeparator - 1);
+ command[UDF_NAME] = funcName;
+ command[PATH_TO_FILE_CACHE] = "\"" + jobDir + filePath.substring(0, lastSeparator) + "\"";
+ command[STD_OUT_OUTPUT_PATH] = outFileName;
+ command[STD_ERR_OUTPUT_PATH] = errOutFileName;
+ command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName;
+ command[IS_ILLUSTRATE] = isIllustrate;
+ return command;
+ }
+
+ private void createInputHandlers() throws ExecException, FrontendException {
+ PigStreamingUDF serializer = new PigStreamingUDF();
+ this.inputHandler = new StreamingUDFInputHandler(serializer);
+ PigStreamingUDF deserializer = new PigStreamingUDF(schema.getField(0));
+ this.outputHandler = new StreamingUDFOutputHandler(deserializer);
+ }
+
+ private void setStreams() throws IOException {
+ stdout = new DataInputStream(new BufferedInputStream(process
+ .getInputStream()));
+ outputHandler.bindTo("", new BufferedPositionedInputStream(stdout),
+ 0, Long.MAX_VALUE);
+
+ stdin = new DataOutputStream(new BufferedOutputStream(process
+ .getOutputStream()));
+ inputHandler.bindTo(stdin);
+
+ stderr = new DataInputStream(new BufferedInputStream(process
+ .getErrorStream()));
+ }
+
+ private void startThreads() {
+ stdinThread = new ProcessInputThread();
+ stdinThread.start();
+
+ stdoutThread = new ProcessOutputThread();
+ stdoutThread.start();
+
+ stderrThread = new ProcessErrorThread();
+ stderrThread.start();
+ }
+
+ /**
+ * Find the path to the controller file for the streaming language.
+ *
+ * First check path to job jar and if the file is not found (like in the
+ * case of running hadoop in standalone mode) write the necessary files
+ * to temporary files and return that path.
+ *
+ * @param language
+ * @param jarPath
+ * @return
+ * @throws IOException
+ */
+ private String getControllerPath(String jarPath) throws IOException {
+ if (language.toLowerCase().equals("python")) {
+ String controllerPath = jarPath + PYTHON_CONTROLLER_JAR_PATH;
+ File controller = new File(controllerPath);
+ if (!controller.exists()) {
+ File controllerFile = File.createTempFile("controller", ".py");
+ InputStream pythonControllerStream = Launcher.class.getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH);
+ try {
+ FileUtils.copyInputStreamToFile(pythonControllerStream, controllerFile);
+ } finally {
+ pythonControllerStream.close();
+ }
+ controllerFile.deleteOnExit();
+ File pigUtilFile = new File(controllerFile.getParent() + "/pig_util.py");
+ pigUtilFile.deleteOnExit();
+ InputStream pythonUtilStream = Launcher.class.getResourceAsStream(PYTHON_PIG_UTIL_PATH);
+ try {
+ FileUtils.copyInputStreamToFile(pythonUtilStream, pigUtilFile);
+ } finally {
+ pythonUtilStream.close();
+ }
+ controllerPath = controllerFile.getAbsolutePath();
+ }
+ return controllerPath;
+ } else {
+ throw new ExecException("Invalid language: " + language);
+ }
+ }
+
+ /**
+ * Returns a list of file names (relative to root of pig jar) of files that need to be
+ * included in the jar shipped to the cluster.
+ *
+ * Will need to be smarter as more languages are added and the controller files are large.
+ *
+ * @return
+ */
+ public static List<String> getResourcesForJar() {
+ List<String> files = new ArrayList<String>();
+ files.add(PYTHON_CONTROLLER_JAR_PATH);
+ files.add(PYTHON_PIG_UTIL_PATH);
+ return files;
+ }
+
+ private Object getOutput(Tuple input) throws ExecException {
+ if (outputQueue == null) {
+ throw new ExecException("Process has already been shut down. No way to retrieve output for input: " + input);
+ }
+
+ if (ScriptingOutputCapturer.isClassCapturingOutput() &&
+ !soc.isInstanceCapturingOutput()) {
+ Tuple t = TupleFactory.getInstance().newTuple(TURN_ON_OUTPUT_CAPTURING);
+ try {
+ inputQueue.put(t);
+ } catch (InterruptedException e) {
+ throw new ExecException("Failed adding capture input flag to inputQueue");
+ }
+ soc.setInstanceCapturingOutput(true);
+ }
+
+ try {
+ if (this.getInputSchema() == null || this.getInputSchema().size() == 0) {
+ //When nothing is passed into the UDF the tuple
+ //being sent is the full tuple for the relation.
+ //We want it to be nothing (since that's what the user wrote).
+ input = TupleFactory.getInstance().newTuple(0);
+ }
+ inputQueue.put(input);
+ } catch (Exception e) {
+ throw new ExecException("Failed adding input to inputQueue", e);
+ }
+ Object o = null;
+ try {
+ if (outputQueue != null) {
+ o = outputQueue.take();
+ if (o == NULL_OBJECT) {
+ o = null;
+ }
+ }
+ } catch (Exception e) {
+ throw new ExecException("Problem getting output", e);
+ }
+
+ if (o == ERROR_OUTPUT) {
+ outputQueue = null;
+ if (outerrThreadsError == null) {
+ outerrThreadsError = new StreamingUDFException(this.language, "Problem with streaming udf. Can't recreate exception.");
+ }
+ throw outerrThreadsError;
+ }
+
+ return o;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return this.schema;
+ }
+
+ /**
+ * The thread which consumes input and feeds it to the the Process
+ */
+ class ProcessInputThread extends Thread {
+ ProcessInputThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ log.debug("Starting PIT");
+ while (true) {
+ Tuple inputTuple = inputQueue.take();
+ inputHandler.putNext(inputTuple);
+ try {
+ stdin.flush();
+ } catch(Exception e) {
+ return;
+ }
+ }
+ } catch (Exception e) {
+ log.error(e);
+ }
+ }
+ }
+
+ private static final int WAIT_FOR_ERROR_LENGTH = 500;
+ private static final int MAX_WAIT_FOR_ERROR_ATTEMPTS = 5;
+
+ /**
+ * The thread which consumes output from process
+ */
+ class ProcessOutputThread extends Thread {
+ ProcessOutputThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ Object o = null;
+ try{
+ log.debug("Starting POT");
+ //StreamUDFToPig wraps object in single element tuple
+ o = outputHandler.getNext().get(0);
+ while (o != OutputHandler.END_OF_OUTPUT) {
+ if (o != null)
+ outputQueue.put(o);
+ else
+ outputQueue.put(NULL_OBJECT);
+ o = outputHandler.getNext().get(0);
+ }
+ } catch(Exception e) {
+ if (outputQueue != null) {
+ try {
+ //Give error thread a chance to check the standard error output
+ //for an exception message.
+ int attempt = 0;
+ while (stderrThread.isAlive() && attempt < MAX_WAIT_FOR_ERROR_ATTEMPTS) {
+ Thread.sleep(WAIT_FOR_ERROR_LENGTH);
+ attempt++;
+ }
+ //Only write this if no other error. Don't want to overwrite
+ //an error from the error thread.
+ if (outerrThreadsError == null) {
+ outerrThreadsError = new StreamingUDFException(language, "Error deserializing output. Please check that the declared outputSchema for function " +
+ funcName + " matches the data type being returned.", e);
+ }
+ outputQueue.put(ERROR_OUTPUT); //Need to wake main thread.
+ } catch(InterruptedException ie) {
+ log.error(ie);
+ }
+ }
+ }
+ }
+ }
+
+ class ProcessErrorThread extends Thread {
+ public ProcessErrorThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ log.debug("Starting PET");
+ Integer lineNumber = null;
+ StringBuffer error = new StringBuffer();
+ String errInput;
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(stderr, Charsets.UTF_8));
+ while ((errInput = reader.readLine()) != null) {
+ //First line of error stream is usually the line number of error.
+ //If its not a number just treat it as first line of error message.
+ if (lineNumber == null) {
+ try {
+ lineNumber = Integer.valueOf(errInput);
+ } catch (NumberFormatException nfe) {
+ error.append(errInput + "\n");
+ }
+ } else {
+ error.append(errInput + "\n");
+ }
+ }
+ outerrThreadsError = new StreamingUDFException(language, error.toString(), lineNumber);
+ if (outputQueue != null) {
+ outputQueue.put(ERROR_OUTPUT); //Need to wake main thread.
+ }
+ if (stderr != null) {
+ stderr.close();
+ stderr = null;
+ }
+ } catch (IOException e) {
+ log.debug("Process Ended");
+ } catch (Exception e) {
+ log.error("standard error problem", e);
+ }
+ }
+ }
+
+ public class ProcessKiller implements Runnable {
+ public void run() {
+ process.destroy();
+ }
+ }
+}
Modified: pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Mon Sep 23 15:56:16 2013
@@ -17,32 +17,24 @@
*/
package org.apache.pig.impl.streaming;
-import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
-
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.streaming.InputHandler.InputType;
import org.apache.pig.impl.streaming.OutputHandler.OutputType;
import org.apache.pig.impl.util.UDFContext;
@@ -60,12 +52,9 @@ import org.apache.pig.impl.util.UDFConte
public class ExecutableManager {
private static final Log LOG = LogFactory.getLog(ExecutableManager.class);
private static final int SUCCESS = 0;
- private static final String PATH = "PATH";
- private static final String BASH = "bash";
private static final Result EOS_RESULT = new Result(POStatus.STATUS_EOS, null);
protected StreamingCommand command; // Streaming command to be run
- String argvAsString; // Parsed commands
Process process; // Handle to the process
protected int exitCode = -127; // Exit code of the process
@@ -109,12 +98,6 @@ public class ExecutableManager {
public void configure(POStream stream) throws IOException, ExecException {
this.poStream = stream;
this.command = stream.getCommand();
- String[] argv = this.command.getCommandArgs();
- argvAsString = "";
- for (String arg : argv) {
- argvAsString += arg;
- argvAsString += " ";
- }
// Create the input/output handlers
this.inputHandler = HandlerFactory.createInputHandler(command);
@@ -204,57 +187,6 @@ public class ExecutableManager {
}
/**
- * Set up the run-time environment of the managed process.
- *
- * @param pb
- * {@link ProcessBuilder} used to exec the process
- */
- protected void setupEnvironment(ProcessBuilder pb) {
- String separator = ":";
- Configuration conf = UDFContext.getUDFContext().getJobConf();
- Map<String, String> env = pb.environment();
- addJobConfToEnvironment(conf, env);
-
- // Add the current-working-directory to the $PATH
- File dir = pb.directory();
- String cwd = (dir != null) ? dir.getAbsolutePath() : System
- .getProperty("user.dir");
-
- String envPath = env.get(PATH);
- if (envPath == null) {
- envPath = cwd;
- } else {
- envPath = envPath + separator + cwd;
- }
- env.put(PATH, envPath);
- }
-
- void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
- String propsToSend = conf.get(PIG_STREAMING_ENVIRONMENT);
- LOG.debug("Properties to ship to streaming environment set in "+PIG_STREAMING_ENVIRONMENT+": " + propsToSend);
- if (propsToSend == null) {
- return;
- }
-
- for (String prop : propsToSend.split(",")) {
- String value = conf.get(prop);
- if (value == null) {
- LOG.warn("Property set in "+PIG_STREAMING_ENVIRONMENT+" not found in Configuration: " + prop);
- continue;
- }
- LOG.debug("Setting property in streaming environment: " + prop);
- envPut(env, prop, value);
- }
- }
-
- void envPut(Map<String, String> env, String name, String value) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Add env entry:" + name + "=" + value);
- }
- env.put(name, value);
- }
-
- /**
* Start execution of the external process.
*
* This takes care of setting up the environment of the process and also
@@ -264,26 +196,7 @@ public class ExecutableManager {
* @throws IOException
*/
protected void exec() throws IOException {
- // Set the actual command to run with 'bash -c exec ...'
- List<String> cmdArgs = new ArrayList<String>();
-
- if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
- cmdArgs.add("cmd");
- cmdArgs.add("/c");
- cmdArgs.add(argvAsString);
- } else {
- cmdArgs.add(BASH);
- cmdArgs.add("-c");
- StringBuffer sb = new StringBuffer();
- sb.append("exec ");
- sb.append(argvAsString);
- cmdArgs.add(sb.toString());
- }
-
- // Start the external process
- ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs
- .toArray(new String[cmdArgs.size()]));
- setupEnvironment(processBuilder);
+ ProcessBuilder processBuilder = StreamingUtil.createProcess(this.command);
process = processBuilder.start();
LOG.debug("Started the process for command: " + command);
@@ -514,7 +427,7 @@ public class ExecutableManager {
try {
Result res = new Result();
res.result = "Error reading output from Streaming binary:" +
- "'" + argvAsString + "':" + t.getMessage();
+ "'" + command.toString() + "':" + t.getMessage();
res.returnStatus = POStatus.STATUS_ERR;
sendOutput(binaryOutputQueue, res);
killProcess(process);
@@ -547,7 +460,7 @@ public class ExecutableManager {
}
// signal error
- String errMsg = "Failure while waiting for process (" + argvAsString + ")" +
+ String errMsg = "Failure while waiting for process (" + command.toString() + ")" +
ie.getMessage();
LOG.error(errMsg, ie);
res.result = errMsg;
@@ -561,7 +474,7 @@ public class ExecutableManager {
} else {
// signal Error
- String errMsg = "'" + argvAsString + "'" + " failed with exit status: "
+ String errMsg = "'" + command.toString() + "'" + " failed with exit status: "
+ exitCode;
LOG.error(errMsg);
res.result = errMsg;
Modified: pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Mon Sep 23 15:56:16 2013
@@ -26,6 +26,8 @@ import org.apache.pig.StreamToPig;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import com.google.common.base.Charsets;
+
/**
* {@link OutputHandler} is responsible for handling the output of the
* Pig-Streaming external command.
@@ -36,6 +38,9 @@ import org.apache.pig.impl.io.BufferedPo
* process wrote its output.
*/
public abstract class OutputHandler {
+ public static final Object END_OF_OUTPUT = new Object();
+ private static final byte[] DEFAULT_RECORD_DELIM = new byte[] {'\n'};
+
public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS}
/*
@@ -53,6 +58,11 @@ public abstract class OutputHandler {
private Text currValue = new Text();
private BufferedPositionedInputStream istream;
+
+ //Both of these ignore the trailing \n. So if the
+ //default delimiter is "\n" recordDelimStr is "".
+ private String recordDelimStr = null;
+ private int recordDelimLength = 0;
/**
* Get the handled <code>OutputType</code>.
@@ -92,8 +102,7 @@ public abstract class OutputHandler {
}
currValue.clear();
- int num = in.readLine(currValue);
- if (num <= 0) {
+ if (!readValue()) {
return null;
}
@@ -106,6 +115,54 @@ public abstract class OutputHandler {
}
}
+ private boolean readValue() throws IOException {
+ int num = in.readLine(currValue);
+ if (num <= 0) {
+ return false;
+ }
+
+ while(!isEndOfRow()) {
+ //Need to add back the newline character we ate.
+ currValue.append(new byte[] {'\n'}, 0, 1);
+
+ byte[] lineBytes = readNextLine();
+ if (lineBytes == null) {
+ //We have no more input, so just break;
+ break;
+ }
+ currValue.append(lineBytes, 0, lineBytes.length);
+ }
+
+ return true;
+ }
+
+ private byte[] readNextLine() throws IOException {
+ Text line = new Text();
+ int num = in.readLine(line);
+ byte[] lineBytes = line.getBytes();
+ if (num <= 0) {
+ return null;
+ }
+
+ return lineBytes;
+ }
+
+ private boolean isEndOfRow() {
+ if (recordDelimStr == null) {
+ byte[] recordDelimBa = getRecordDelimiter();
+ recordDelimLength = recordDelimBa.length - 1; //Ignore trailing \n
+ recordDelimStr = new String(recordDelimBa, 0, recordDelimLength, Charsets.UTF_8);
+ }
+ if (recordDelimLength == 0 || currValue.getLength() < recordDelimLength) {
+ return true;
+ }
+ return currValue.find(recordDelimStr, currValue.getLength() - recordDelimLength) >= 0;
+ }
+
+ protected byte[] getRecordDelimiter() {
+ return DEFAULT_RECORD_DELIM;
+ }
+
/**
* Close the <code>OutputHandler</code>.
* @throws IOException
Added: pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/PigStreamingUDF.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,207 @@
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.PigStreamingBase;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.ToDate;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.WritableByteArray;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.StorageUtil;
+
+import com.google.common.base.Charsets;
+
+public class PigStreamingUDF extends PigStreamingBase {
+ private static final byte PRE_WRAP_DELIM = '|';
+ private static final byte POST_WRAP_DELIM = '_';
+ private static final StreamingDelimiters DELIMS =
+ new StreamingDelimiters(PRE_WRAP_DELIM, POST_WRAP_DELIM, false);
+
+ private FieldSchema topLevelFs;
+ private static TupleFactory tupleFactory = TupleFactory.getInstance();
+ private static BagFactory bagFactory = BagFactory.getInstance();
+
+ private WritableByteArray out;
+
+ public PigStreamingUDF() {
+ out = new WritableByteArray();
+ }
+
+ public PigStreamingUDF(FieldSchema topLevelFs) {
+ out = new WritableByteArray();
+ this.topLevelFs = topLevelFs;
+ }
+
+ @Override
+ public WritableByteArray serializeToBytes(Tuple t) throws IOException {
+ out.reset();
+ int sz;
+ Object field;
+ if (t == null) {
+ sz = 0;
+ } else {
+ sz = t.size();
+ }
+ for (int i=0; i < sz; i++) {
+ field = t.get(i);
+ StorageUtil.putField(out, field, DELIMS, true);
+ if (i != sz-1) {
+ out.write(DELIMS.getParamDelim());
+ }
+ }
+ byte[] recordDel = DELIMS.getRecordEnd();
+ out.write(recordDel, 0, recordDel.length);
+ return out;
+ }
+
+ @Override
+ public LoadCaster getLoadCaster() throws IOException {
+ return new Utf8StorageConverter();
+ }
+
+ @Override
+ public Tuple deserialize(byte[] bytes, int offset, int length) throws IOException {
+ Object o = deserialize(topLevelFs, bytes, 0 + offset, length - DELIMS.getRecordEnd().length); //Drop newline
+ return tupleFactory.newTuple(o);
+ }
+
+ public byte[] getRecordDelim() {
+ return DELIMS.getRecordEnd();
+ }
+
+ private Object deserialize(FieldSchema fs, byte[] bytes, int startIndex, int endIndex) throws IOException {
+ //If null, return null;
+ if (WritableComparator.compareBytes(
+ bytes, startIndex, DELIMS.getNull().length,
+ DELIMS.getNull(), 0, DELIMS.getNull().length) == 0) {
+ return null;
+ }
+
+ if (fs.type == DataType.BAG) {
+ return deserializeBag(fs, bytes, startIndex + 3, endIndex - 2);
+ } else if (fs.type == DataType.TUPLE) {
+ return deserializeTuple(fs, bytes, startIndex + 3, endIndex - 2);
+ } else if (fs.type == DataType.MAP) {
+ return deserializeMap(bytes, startIndex + 3, endIndex - 2);
+ }
+
+ if (fs.type == DataType.CHARARRAY) {
+ return extractString(bytes, startIndex, endIndex, true);
+ } else if (fs.type == DataType.BYTEARRAY) {
+ return new DataByteArray(bytes, startIndex, endIndex+1);
+ }
+
+ //Can we do this faster?
+ String val = extractString(bytes, startIndex, endIndex, false);
+
+ if (fs.type == DataType.LONG) {
+ return Long.valueOf(val);
+ } else if (fs.type == DataType.INTEGER) {
+ return Integer.valueOf(val);
+ } else if (fs.type == DataType.FLOAT) {
+ return Float.valueOf(val);
+ } else if (fs.type == DataType.DOUBLE) {
+ return Double.valueOf(val);
+ } else if (fs.type == DataType.BOOLEAN) {
+ return Boolean.valueOf(val);
+ } else if (fs.type == DataType.DATETIME) {
+ return ToDate.extractDateTime(val);
+ } else if (fs.type == DataType.BIGINTEGER) {
+ return new BigInteger(val);
+ } else if (fs.type == DataType.BIGDECIMAL) {
+ return new BigDecimal(val);
+ } else {
+ throw new ExecException("Can't deserialize type: " + DataType.findTypeName(fs.type));
+ }
+ }
+
+ private DataBag deserializeBag(FieldSchema fs, byte[] buf, int startIndex, int endIndex) throws IOException {
+ ArrayList<Tuple> protoBag = new ArrayList<Tuple>();
+ int depth = 0;
+ int fieldStart = startIndex;
+
+ for (int index = startIndex; index <= endIndex; index++) {
+ depth = DELIMS.updateDepth(buf, depth, index);
+ if ( StreamingDelimiters.isDelimiter(DELIMS.getFieldDelim(), buf, index, depth, endIndex)) {
+ protoBag.add((Tuple)deserialize(fs.schema.getField(0), buf, fieldStart, index - 1));
+ fieldStart = index + 3;
+ }
+ }
+ return bagFactory.newDefaultBag(protoBag);
+ }
+
+ private Tuple deserializeTuple(FieldSchema fs, byte[] buf, int startIndex, int endIndex) throws IOException {
+ Schema tupleSchema = fs.schema;
+
+ ArrayList<Object> protoTuple = new ArrayList<Object>(tupleSchema.size());
+ int depth = 0;
+ int fieldNum = 0;
+ int fieldStart = startIndex;
+
+
+ for (int index = startIndex; index <= endIndex; index++) {
+ depth = DELIMS.updateDepth(buf, depth, index);
+ if (StreamingDelimiters.isDelimiter(DELIMS.getFieldDelim(), buf, index, depth, endIndex)) {
+ protoTuple.add(deserialize(tupleSchema.getField(fieldNum), buf, fieldStart, index - 1));
+ fieldStart = index + 3;
+ fieldNum++;
+ }
+ }
+ return tupleFactory.newTupleNoCopy(protoTuple);
+ }
+
+ private Map<String, Object> deserializeMap(byte[] buf, int startIndex, int endIndex) throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ int depth = 0;
+ int fieldStart = startIndex;
+
+ String key = null;
+ Object val = null;
+
+ for (int index = startIndex; index <= endIndex; index++) {
+ byte currChar = buf[index];
+ depth = DELIMS.updateDepth(buf, depth, index);
+
+ if (currChar == DELIMS.getMapKeyDelim() && depth == 0) {
+ key = extractString(buf, fieldStart, index - 1, true);
+ fieldStart = index + 1;
+ }
+
+ if (StreamingDelimiters.isDelimiter(DELIMS.getFieldDelim(), buf, index, depth, endIndex)) {
+ val = extractString(buf, fieldStart, index - 1, true);
+ map.put(key, val);
+ fieldStart = index + 3;
+ }
+ }
+
+ return map;
+ }
+
+ private String extractString(byte[] bytes, int startIndex, int endIndex, boolean useUtf8) {
+ int length = endIndex - startIndex + 1;
+ if (useUtf8) {
+ return new String(bytes, startIndex, length, Charsets.UTF_8);
+ } else {
+ return new String(bytes, startIndex, length, Charset.defaultCharset());
+ }
+ }
+
+
+
+}
Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingDelimiters.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,156 @@
+package org.apache.pig.impl.streaming;
+
+import org.apache.hadoop.io.WritableComparator;
+
+public class StreamingDelimiters {
+ //RECORD_END must be \n. This assumption is baked into our logic for reading in
+ //and parsing input.
+ private static final byte RECORD_END = '\n';
+ private static final byte PARAM_DELIM = '\t';
+ private static final byte NULL_BYTE = '-';
+ private static final byte TUPLE_BEGIN = '(';
+ private static final byte TUPLE_END = ')';
+ private static final byte BAG_BEGIN = '{';
+ private static final byte BAG_END = '}';
+ private static final byte MAP_BEGIN = '[';
+ private static final byte MAP_END = ']';
+ private static final byte FIELD_DELIM = ',';
+ private static final byte MAP_KEY_VALUE_DELIM = '#'; //Not wrapped by wrapDelimField
+ private byte preWrapDelim;
+ private byte postWrapDelim;
+
+ private byte[] tupleBegin;
+ private byte[] tupleEnd;
+ private byte[] bagBegin;
+ private byte[] bagEnd;
+ private byte[] mapBegin;
+ private byte[] mapEnd;
+ private byte[] fieldDelim;
+ private byte[] nullByte;
+ private byte[] paramDelim;
+ private byte[] recordEnd;
+
+ public StreamingDelimiters() {
+ this((byte) 0, (byte) 0, true);
+ }
+
+ /**
+ *
+ * @param preWrapDelim
+ * @param postWrapDelim
+ * @param useEmptyNull - In the past empty was used to serialize null. But this can
+ * make it impossible to differentiate between an empty string and null. Set
+ * to false if you want to use a special character to represent null.
+ */
+ public StreamingDelimiters(byte preWrapDelim, byte postWrapDelim, boolean useEmptyNull) {
+ this.preWrapDelim = preWrapDelim;
+ this.postWrapDelim = postWrapDelim;
+
+ this.tupleBegin = getFullDelim(TUPLE_BEGIN);
+ this.tupleEnd = getFullDelim(TUPLE_END);
+ this.bagBegin = getFullDelim(BAG_BEGIN);
+ this.bagEnd = getFullDelim(BAG_END);
+ this.mapBegin = getFullDelim(MAP_BEGIN);
+ this.mapEnd = getFullDelim(MAP_END);
+ this.fieldDelim = getFullDelim(FIELD_DELIM);
+
+ if (useEmptyNull) {
+ this.nullByte = new byte[] {};
+ } else {
+ this.nullByte = getFullDelim(NULL_BYTE);
+ }
+
+ this.paramDelim = getFullDelim(PARAM_DELIM);
+ //recordEnd has to end with the RECORD_END byte
+ this.recordEnd = new byte[] {preWrapDelim, postWrapDelim, RECORD_END};
+ }
+
+ private byte[] getFullDelim(byte val) {
+ if (preWrapDelim == 0)
+ return new byte[] {val};
+ else
+ return new byte[] {preWrapDelim, val, postWrapDelim};
+ }
+
+ public byte[] getTupleBegin() {
+ return tupleBegin;
+ }
+
+ public byte[] getTupleEnd() {
+ return tupleEnd;
+ }
+
+ public byte[] getBagBegin() {
+ return bagBegin;
+ }
+
+ public byte[] getBagEnd() {
+ return bagEnd;
+ }
+
+ public byte[] getMapBegin() {
+ return mapBegin;
+ }
+
+ public byte[] getMapEnd() {
+ return mapEnd;
+ }
+
+ public byte[] getFieldDelim() {
+ return fieldDelim;
+ }
+
+ public byte getMapKeyDelim() {
+ return MAP_KEY_VALUE_DELIM;
+ }
+
+ public byte[] getNull() {
+ return nullByte;
+ }
+
+ public byte[] getParamDelim() {
+ return paramDelim;
+ }
+
+ public byte[] getRecordEnd() {
+ return recordEnd;
+ }
+
+ /**
+ * @return - The new depth. Depth is increased if at the end of a byte sequence
+ * that indicates the start of a bag, tuple, or map. Depth is decreased if at the
+ * end of a byte sequence that indicates the end of a bug, tuple, or map.
+ */
+ public int updateDepth(byte[] buf, int currDepth, int index) {
+ if (index < 2 || preWrapDelim == 0 || buf[index-2] != preWrapDelim || buf[index] != postWrapDelim) {
+ return currDepth;
+ }
+
+ byte delimChar = preWrapDelim == 0 ? buf[index] : buf[index-1];
+ if (delimChar == BAG_BEGIN || delimChar == TUPLE_BEGIN || delimChar == MAP_BEGIN) {
+ return currDepth + 1;
+ } else if (delimChar == BAG_END || delimChar == TUPLE_END || delimChar == MAP_END) {
+ return currDepth - 1;
+ } else {
+ return currDepth;
+ }
+ }
+
+ /**
+ *
+ * @param delimiter
+ * @param buf
+ * @param index
+ * @param depth
+ * @param endIndex
+ * @return - True iff the delimiter
+ */
+ public static boolean isDelimiter(byte[] delimiter, byte[] buf, int index, int depth, int endIndex) {
+ return (depth == 0 && ( index == endIndex ||
+ ( index <= endIndex - 2 &&
+ WritableComparator.compareBytes(
+ buf, index, delimiter.length,
+ delimiter, 0, delimiter.length) == 0)));
+ }
+
+ }
\ No newline at end of file
Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFException.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,61 @@
+package org.apache.pig.impl.streaming;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+public class StreamingUDFException extends ExecException {
+ static final long serialVersionUID = 1;
+ private String message;
+ private String language;
+ private Integer lineNumber;
+
+ public StreamingUDFException() {
+ }
+
+ public StreamingUDFException(String message) {
+ this.message = message;
+ }
+
+ public StreamingUDFException(String message, Integer lineNumber) {
+ this.message = message;
+ this.lineNumber = lineNumber;
+ }
+
+ public StreamingUDFException(String language, String message, Throwable cause) {
+ super(cause);
+ this.language = language;
+ this.message = message + "\n" + cause.getMessage() + "\n";
+ }
+
+ public StreamingUDFException(String language, String message) {
+ this(language, message, (Integer) null);
+ }
+
+ public StreamingUDFException(String language, String message, Integer lineNumber) {
+ this.language = language;
+ this.message = message;
+ this.lineNumber = lineNumber;
+ }
+
+ public String getLanguage() {
+ return language;
+ }
+
+ public Integer getLineNumber() {
+ return lineNumber;
+ }
+
+ @Override
+ public String getMessage() {
+ return this.message;
+ }
+
+ @Override
+ public String toString() {
+ //Don't modify this without also modifying Launcher.getExceptionFromStrings!
+ String s = getClass().getName();
+ String message = getMessage();
+ String lineNumber = this.getLineNumber() == null ? "" : "" + this.getLineNumber();
+ return (message != null) ? (s + ": " + "LINE " + lineNumber + ": " + message) : s;
+
+ }
+}
\ No newline at end of file
Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFInputHandler.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,10 @@
+package org.apache.pig.impl.streaming;
+
+import org.apache.pig.PigStreamingBase;
+
+public class StreamingUDFInputHandler extends DefaultInputHandler {
+
+ public StreamingUDFInputHandler(PigStreamingBase serializer) {
+ this.serializer = serializer;
+ }
+}
\ No newline at end of file
Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputHandler.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,14 @@
+package org.apache.pig.impl.streaming;
+
+
+public class StreamingUDFOutputHandler extends DefaultOutputHandler {
+
+ public StreamingUDFOutputHandler(PigStreamingUDF deserializer) {
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ protected byte[] getRecordDelimiter() {
+ return ( ((PigStreamingUDF)deserializer).getRecordDelim() );
+ }
+}
Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUDFOutputSchemaException.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,16 @@
+package org.apache.pig.impl.streaming;
+
+public class StreamingUDFOutputSchemaException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ private int lineNumber;
+
+ public StreamingUDFOutputSchemaException(String message, int lineNumber) {
+ super(message);
+ this.lineNumber = lineNumber;
+ }
+
+ public int getLineNumber() {
+ return lineNumber;
+ }
+}
Added: pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java (added)
+++ pig/trunk/src/org/apache/pig/impl/streaming/StreamingUtil.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,109 @@
+package org.apache.pig.impl.streaming;
+
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.impl.util.UDFContext;
+
+public class StreamingUtil {
+ private static Log LOG = LogFactory.getLog(StreamingUtil.class);
+
+ private static final String BASH = "bash";
+ private static final String PATH = "PATH";
+
+ /**
+ * Create an external process for StreamingCommand command.
+ *
+ * @param command
+ * @return
+ */
+ public static ProcessBuilder createProcess(StreamingCommand command) {
+ // Set the actual command to run with 'bash -c exec ...'
+ List<String> cmdArgs = new ArrayList<String>();
+ String[] argv = command.getCommandArgs();
+
+ StringBuffer argBuffer = new StringBuffer();
+ for (String arg : argv) {
+ argBuffer.append(arg);
+ argBuffer.append(" ");
+ }
+ String argvAsString = argBuffer.toString();
+
+ if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+ cmdArgs.add("cmd");
+ cmdArgs.add("/c");
+ cmdArgs.add(argvAsString);
+ } else {
+ cmdArgs.add(BASH);
+ cmdArgs.add("-c");
+ StringBuffer sb = new StringBuffer();
+ sb.append("exec ");
+ sb.append(argvAsString);
+ cmdArgs.add(sb.toString());
+ }
+
+ // Start the external process
+ ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs
+ .toArray(new String[cmdArgs.size()]));
+ setupEnvironment(processBuilder);
+ return processBuilder;
+ }
+
+ /**
+ * Set up the run-time environment of the managed process.
+ *
+ * @param pb
+ * {@link ProcessBuilder} used to exec the process
+ */
+ private static void setupEnvironment(ProcessBuilder pb) {
+ String separator = ":";
+ Configuration conf = UDFContext.getUDFContext().getJobConf();
+ Map<String, String> env = pb.environment();
+ addJobConfToEnvironment(conf, env);
+
+ // Add the current-working-directory to the $PATH
+ File dir = pb.directory();
+ String cwd = (dir != null) ? dir.getAbsolutePath() : System
+ .getProperty("user.dir");
+
+ String envPath = env.get(PATH);
+ if (envPath == null) {
+ envPath = cwd;
+ } else {
+ envPath = envPath + separator + cwd;
+ }
+ env.put(PATH, envPath);
+ }
+
+ protected static void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
+ String propsToSend = conf.get(PIG_STREAMING_ENVIRONMENT);
+ LOG.debug("Properties to ship to streaming environment set in "+PIG_STREAMING_ENVIRONMENT+": " + propsToSend);
+ if (propsToSend == null) {
+ return;
+ }
+
+ for (String prop : propsToSend.split(",")) {
+ String value = conf.get(prop);
+ if (value == null) {
+ LOG.warn("Property set in "+PIG_STREAMING_ENVIRONMENT+" not found in Configuration: " + prop);
+ continue;
+ }
+ LOG.debug("Setting property in streaming environment: " + prop);
+ envPut(env, prop, value);
+ }
+ }
+
+ private static void envPut(Map<String, String> env, String name, String value) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add env entry:" + name + "=" + value);
+ }
+ env.put(name, value);
+ }
+}
Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Mon Sep 23 15:56:16 2013
@@ -42,8 +42,10 @@ import java.util.zip.ZipEntry;
import org.antlr.runtime.CommonTokenStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.StreamingUDF;
import org.apache.tools.bzip2r.BZip2Constants;
import org.codehaus.jackson.annotate.JsonPropertyOrder;
import org.codehaus.jackson.map.annotate.JacksonStdImpl;
@@ -132,7 +134,10 @@ public class JarManager {
*/
@SuppressWarnings("deprecation")
public static void createJar(OutputStream os, Set<String> funcs, PigContext pigContext) throws ClassNotFoundException, IOException {
+ JarOutputStream jarFile = new JarOutputStream(os);
+ HashMap<String, String> contents = new HashMap<String, String>();
Vector<JarListEntry> jarList = new Vector<JarListEntry>();
+
for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
addContainingJar(jarList, pkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
}
@@ -141,10 +146,16 @@ public class JarManager {
Class clazz = pigContext.getClassForAlias(func);
if (clazz != null) {
addContainingJar(jarList, clazz, null, pigContext);
+
+ if (clazz.getSimpleName().equals("StreamingUDF")) {
+ for (String fileName : StreamingUDF.getResourcesForJar()) {
+ InputStream in = Launcher.class.getResourceAsStream(fileName);
+ addStream(jarFile, fileName, in, contents);
+ }
+ }
}
}
- HashMap<String, String> contents = new HashMap<String, String>();
- JarOutputStream jarFile = new JarOutputStream(os);
+
Iterator<JarListEntry> it = jarList.iterator();
while (it.hasNext()) {
JarListEntry jarEntry = it.next();
Modified: pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java Mon Sep 23 15:56:16 2013
@@ -21,12 +21,12 @@ import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import org.joda.time.DateTime;
-
import org.apache.hadoop.io.Text;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +37,10 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.streaming.StreamingDelimiters;
+import org.joda.time.DateTime;
+
+import com.google.common.base.Charsets;
/**
* This util class provides methods that are shared by storage class
@@ -44,9 +48,23 @@ import org.apache.pig.data.TupleFactory;
*
*/
public final class StorageUtil {
-
- private static final String UTF8 = "UTF-8";
-
+ private static Map<Byte, byte[]> TYPE_INDICATOR;
+ static {
+ TYPE_INDICATOR = new HashMap<Byte, byte[]>();
+ TYPE_INDICATOR.put(DataType.BOOLEAN, new byte[] {'B'});
+ TYPE_INDICATOR.put(DataType.INTEGER, new byte[] {'I'});
+ TYPE_INDICATOR.put(DataType.LONG, new byte[] {'L'});
+ TYPE_INDICATOR.put(DataType.FLOAT, new byte[] {'F'});
+ TYPE_INDICATOR.put(DataType.DOUBLE, new byte[] {'D'});
+ TYPE_INDICATOR.put(DataType.BYTEARRAY, new byte[] {'A'});
+ TYPE_INDICATOR.put(DataType.CHARARRAY, new byte[] {'C'});
+ TYPE_INDICATOR.put(DataType.DATETIME, new byte[] {'T'});
+ TYPE_INDICATOR.put(DataType.BIGINTEGER, new byte[] {'N'});
+ TYPE_INDICATOR.put(DataType.BIGDECIMAL, new byte[] {'E'});
+ }
+
+ public static final StreamingDelimiters DEFAULT_DELIMITERS = new StreamingDelimiters();
+
/**
* Transform a <code>String</code> into a byte representing the
* field delimiter.
@@ -94,6 +112,14 @@ public final class StorageUtil {
return fieldDel;
}
+ public static void putField(OutputStream out, Object field) throws IOException {
+ putField(out, field, DEFAULT_DELIMITERS, false);
+ }
+
+ public static void putField(OutputStream out, Object field, boolean includeTypeInformation) throws IOException {
+ putField(out, field, DEFAULT_DELIMITERS, includeTypeInformation);
+ }
+
/**
* Serialize an object to an {@link OutputStream} in the
* field-delimited form.
@@ -103,113 +129,112 @@ public final class StorageUtil {
* @throws IOException if serialization fails.
*/
@SuppressWarnings("unchecked")
- public static void putField(OutputStream out, Object field)
+ public static void putField(OutputStream out, Object field, StreamingDelimiters delims, boolean includeTypeInformation)
throws IOException {
- //string constants for each delimiter
- String tupleBeginDelim = "(";
- String tupleEndDelim = ")";
- String bagBeginDelim = "{";
- String bagEndDelim = "}";
- String mapBeginDelim = "[";
- String mapEndDelim = "]";
- String fieldDelim = ",";
- String mapKeyValueDelim = "#";
-
switch (DataType.findType(field)) {
case DataType.NULL:
- break; // just leave it empty
+ out.write(delims.getNull());
+ break;
case DataType.BOOLEAN:
- out.write(((Boolean)field).toString().getBytes());
+ writeField(out, ((Boolean)field).toString().getBytes(Charset.defaultCharset()),
+ DataType.BOOLEAN, includeTypeInformation);
break;
case DataType.INTEGER:
- out.write(((Integer)field).toString().getBytes());
+ writeField(out, ((Integer)field).toString().getBytes(Charset.defaultCharset()),
+ DataType.INTEGER, includeTypeInformation);
break;
case DataType.LONG:
- out.write(((Long)field).toString().getBytes());
+ writeField(out, ((Long)field).toString().getBytes(Charset.defaultCharset()),
+ DataType.LONG, includeTypeInformation);
break;
case DataType.FLOAT:
- out.write(((Float)field).toString().getBytes());
+ writeField(out, ((Float)field).toString().getBytes(Charset.defaultCharset()),
+ DataType.FLOAT, includeTypeInformation);
break;
case DataType.DOUBLE:
- out.write(((Double)field).toString().getBytes());
+ writeField(out, ((Double)field).toString().getBytes(Charset.defaultCharset()),
+ DataType.DOUBLE, includeTypeInformation);
break;
case DataType.BIGINTEGER:
- out.write(((BigInteger)field).toString().getBytes());
+ writeField(out, ((BigInteger)field).toString().getBytes(Charset.defaultCharset()),
+ DataType.BIGINTEGER, includeTypeInformation);
break;
case DataType.BIGDECIMAL:
- out.write(((BigDecimal)field).toString().getBytes());
+ writeField(out, ((BigDecimal)field).toString().getBytes(Charset.defaultCharset()),
+ DataType.BIGDECIMAL, includeTypeInformation);
break;
case DataType.DATETIME:
- out.write(((DateTime)field).toString().getBytes());
+ writeField(out, ((DateTime)field).toString().getBytes(Charset.defaultCharset()),
+ DataType.DATETIME, includeTypeInformation);
break;
case DataType.BYTEARRAY:
- byte[] b = ((DataByteArray)field).get();
- out.write(b, 0, b.length);
+ writeField(out, ((DataByteArray)field).get(),
+ DataType.BYTEARRAY, includeTypeInformation);
break;
case DataType.CHARARRAY:
- // oddly enough, writeBytes writes a string
- out.write(((String)field).getBytes(UTF8));
+ writeField(out, ((String)field).getBytes(Charsets.UTF_8),
+ DataType.CHARARRAY, includeTypeInformation);
break;
case DataType.MAP:
boolean mapHasNext = false;
Map<String, Object> m = (Map<String, Object>)field;
- out.write(mapBeginDelim.getBytes(UTF8));
+ out.write(delims.getMapBegin());
for(Map.Entry<String, Object> e: m.entrySet()) {
if(mapHasNext) {
- out.write(fieldDelim.getBytes(UTF8));
+ out.write(delims.getFieldDelim());
} else {
mapHasNext = true;
}
- putField(out, e.getKey());
- out.write(mapKeyValueDelim.getBytes(UTF8));
- putField(out, e.getValue());
+ putField(out, e.getKey(), delims, includeTypeInformation);
+ out.write(delims.getMapKeyDelim());
+ putField(out, e.getValue(), delims, includeTypeInformation);
}
- out.write(mapEndDelim.getBytes(UTF8));
+ out.write(delims.getMapEnd());
break;
case DataType.TUPLE:
boolean tupleHasNext = false;
Tuple t = (Tuple)field;
- out.write(tupleBeginDelim.getBytes(UTF8));
+ out.write(delims.getTupleBegin());
for(int i = 0; i < t.size(); ++i) {
if(tupleHasNext) {
- out.write(fieldDelim.getBytes(UTF8));
+ out.write(delims.getFieldDelim());
} else {
tupleHasNext = true;
}
try {
- putField(out, t.get(i));
+ putField(out, t.get(i), delims, includeTypeInformation);
} catch (ExecException ee) {
throw ee;
}
}
- out.write(tupleEndDelim.getBytes(UTF8));
+ out.write(delims.getTupleEnd());
break;
case DataType.BAG:
boolean bagHasNext = false;
- out.write(bagBeginDelim.getBytes(UTF8));
+ out.write(delims.getBagBegin());
Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
while(tupleIter.hasNext()) {
if(bagHasNext) {
- out.write(fieldDelim.getBytes(UTF8));
+ out.write(delims.getFieldDelim());
} else {
bagHasNext = true;
}
- putField(out, (Object)tupleIter.next());
+ putField(out, (Object)tupleIter.next(), delims, includeTypeInformation);
}
- out.write(bagEndDelim.getBytes(UTF8));
+ out.write(delims.getBagEnd());
break;
default: {
@@ -220,6 +245,14 @@ public final class StorageUtil {
}
}
+
+ private static void writeField(OutputStream out, byte[] bytes, byte dataType,
+ boolean includeTypeInformation) throws IOException {
+ if (includeTypeInformation) {
+ out.write(TYPE_INDICATOR.get(dataType));
+ }
+ out.write(bytes);
+ }
/**
* Transform a line of <code>Text</code> to a <code>Tuple</code>
@@ -260,7 +293,7 @@ public final class StorageUtil {
return TupleFactory.getInstance().newTupleNoCopy(protoTuple);
}
-
+
//---------------------------------------------------------------
// private methods
Modified: pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java Mon Sep 23 15:56:16 2013
@@ -50,7 +50,9 @@ public abstract class ScriptEngine {
jruby(new String[]{"ruby", "jruby"}, new String[]{"rb"}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"),
javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine"),
- groovy(new String[]{}, new String[]{"groovy"}, "org.apache.pig.scripting.groovy.GroovyScriptEngine");
+ groovy(new String[]{}, new String[]{"groovy"}, "org.apache.pig.scripting.groovy.GroovyScriptEngine"),
+ streaming_python(new String[]{"streaming_python"}, new String[]{}, "org.apache.pig.scripting.streaming.python.PythonScriptEngine");
+
private static Set<String> supportedScriptLangs;
static {
Added: pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,127 @@
+package org.apache.pig.scripting;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.ExecType;
+import org.apache.pig.impl.util.UDFContext;
+
+import com.google.common.base.Charsets;
+
+/**
+ * This class helps a scripting UDF capture user output by managing when to capture output and where
+ * the output is written to.
+ *
+ * For illustrate, we will only capture output for the last run (with the final set of data) and
+ * we need to keep track of the file containing that output for returning w/ the illustrate results.
+ * For runs, all standard output is written to the user logs.
+ */
+public class ScriptingOutputCapturer {
+ private static Log log = LogFactory.getLog(ScriptingOutputCapturer.class);
+
+ private static Map<String, String> outputFileNames = new HashMap<String, String>();
+ private static String runId = UUID.randomUUID().toString(); //Unique ID for this run to ensure udf output files aren't corrupted from previous runs.
+
+ //Illustrate will set the static flag telling udf to start capturing its output. It's up to each
+ //instance to react to it and set its own flag.
+ private static boolean captureOutput = false;
+ private boolean instancedCapturingOutput = false;
+
+ private ExecType execType;
+
+ public ScriptingOutputCapturer(ExecType execType) {
+ this.execType = execType;
+ }
+
+ public String getStandardOutputRootWriteLocation() {
+ Configuration conf = UDFContext.getUDFContext().getJobConf();
+
+ String jobId = conf.get("mapred.job.id");
+ String taskId = conf.get("mapred.task.id");
+ log.debug("JobId: " + jobId);
+ log.debug("TaskId: " + taskId);
+
+ if (execType.isLocal()) {
+ String logDir = System.getProperty("pig.udf.scripting.log.dir");
+ if (logDir == null)
+ logDir = ".";
+ return logDir + "/" + (taskId == null ? "" : (taskId + "_"));
+ } else {
+ String taskLogDir = getTaskLogDir(jobId, taskId);
+ return taskLogDir + "/";
+ }
+ }
+
+ public String getTaskLogDir(String jobId, String taskId) {
+ String taskLogDir;
+ String hadoopLogDir = System.getProperty("hadoop.log.dir");
+ String defaultUserLogDir = hadoopLogDir + File.separator + "userlogs";
+
+ if ( new File(defaultUserLogDir + File.separator + jobId).exists() ) {
+ taskLogDir = defaultUserLogDir + File.separator + jobId + File.separator + taskId;
+ } else if ( new File(defaultUserLogDir + File.separator + taskId).exists() ) {
+ taskLogDir = defaultUserLogDir + File.separator + taskId;
+ } else if ( new File(defaultUserLogDir).exists() ){
+ taskLogDir = defaultUserLogDir;
+ } else {
+ taskLogDir = hadoopLogDir + File.separator + "udfOutput";
+ }
+ return taskLogDir;
+ }
+
+ public static void startCapturingOutput() {
+ ScriptingOutputCapturer.captureOutput = true;
+ }
+
+ public static Map<String, String> getUdfOutput() throws IOException {
+ Map<String, String> udfFuncNameToOutput = new HashMap<String,String>();
+ for (Map.Entry<String, String> funcToOutputFileName : outputFileNames.entrySet()) {
+ StringBuffer udfOutput = new StringBuffer();
+ FileInputStream fis = new FileInputStream(funcToOutputFileName.getValue());
+ Reader fr = new InputStreamReader(fis, Charsets.UTF_8);
+ BufferedReader br = new BufferedReader(fr);
+
+ try {
+ String line = br.readLine();
+ while (line != null) {
+ udfOutput.append("\t" + line + "\n");
+ line = br.readLine();
+ }
+ } finally {
+ br.close();
+ }
+ udfFuncNameToOutput.put(funcToOutputFileName.getKey(), udfOutput.toString());
+ }
+ return udfFuncNameToOutput;
+ }
+
+ public void registerOutputLocation(String functionName, String fileName) {
+ outputFileNames.put(functionName, fileName);
+ }
+
+ public static String getRunId() {
+ return runId;
+ }
+
+ public static boolean isClassCapturingOutput() {
+ return ScriptingOutputCapturer.captureOutput;
+ }
+
+ public boolean isInstanceCapturingOutput() {
+ return this.instancedCapturingOutput;
+ }
+
+ public void setInstanceCapturingOutput(boolean instanceCapturingOutput) {
+ this.instancedCapturingOutput = instanceCapturingOutput;
+ }
+}
Added: pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java?rev=1525632&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java Mon Sep 23 15:56:16 2013
@@ -0,0 +1,111 @@
+package org.apache.pig.scripting.streaming.python;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.scripting.ScriptEngine;
+import org.apache.pig.tools.pigstats.PigStats;
+
+public class PythonScriptEngine extends ScriptEngine {
+ private static final Log log = LogFactory.getLog(PythonScriptEngine.class);
+
+ @Override
+ public void registerFunctions(String path, String namespace,
+ PigContext pigContext) throws IOException {
+
+ String fileName = path.substring(0, path.length() - ".py".length());
+ log.debug("Path: " + path + " FileName: " + fileName + " Namespace: " + namespace);
+ File f = new File(path);
+
+ if (!f.canRead()) {
+ throw new IOException("Can't read file: " + path);
+ }
+
+ FileInputStream fin = new FileInputStream(f);
+ List<String[]> functions = getFunctions(fin);
+ namespace = namespace == null ? "" : namespace + NAMESPACE_SEPARATOR;
+ for(String[] functionInfo : functions) {
+ String name = functionInfo[0];
+ String schemaString = functionInfo[1];
+ String schemaLineNumber = functionInfo[2];
+ String alias = namespace + name;
+ String execType = (pigContext.getExecType() == ExecType.LOCAL? "local" : "mapreduce");
+ String isIllustrate = (Boolean.valueOf(pigContext.inIllustrator)).toString();
+ log.debug("Registering Function: " + alias);
+ pigContext.registerFunction(alias,
+ new FuncSpec("StreamingUDF",
+ new String[] {
+ "python",
+ fileName, name,
+ schemaString, schemaLineNumber,
+ execType, isIllustrate
+ }));
+ }
+ fin.close();
+ }
+
+ @Override
+ protected Map<String, List<PigStats>> main(PigContext context,
+ String scriptFile) throws IOException {
+ log.warn("ScriptFile: " + scriptFile);
+ registerFunctions(scriptFile, null, context);
+ return getPigStatsMap();
+ }
+
+ @Override
+ protected String getScriptingLang() {
+ return "streaming_python";
+ }
+
+ @Override
+ protected Map<String, Object> getParamsFromVariables() throws IOException {
+ throw new IOException("Unsupported Operation");
+ }
+
+ private static final Pattern pSchema = Pattern.compile("^\\s*\\W+outputSchema.*");
+ private static final Pattern pDef = Pattern.compile("^\\s*def\\s+(\\w+)\\s*.+");
+
+ private static List<String[]> getFunctions(InputStream is) throws IOException {
+ List<String[]> functions = new ArrayList<String[]>();
+ InputStreamReader in = new InputStreamReader(is, Charset.defaultCharset());
+ BufferedReader br = new BufferedReader(in);
+ String line = br.readLine();
+ String schemaString = null;
+ String schemaLineNumber = null;
+ int lineNumber = 1;
+ while (line != null) {
+ if (pSchema.matcher(line).matches()) {
+ int start = line.indexOf("(") + 2; //drop brackets/quotes
+ int end = line.lastIndexOf(")") - 1;
+ schemaString = line.substring(start,end).trim();
+ schemaLineNumber = "" + lineNumber;
+ } else if (pDef.matcher(line).matches()) {
+ int start = line.indexOf("def ") + "def ".length();
+ int end = line.indexOf('(');
+ String functionName = line.substring(start, end).trim();
+ if (schemaString != null) {
+ String[] funcInfo = {functionName, schemaString, "" + schemaLineNumber};
+ functions.add(funcInfo);
+ schemaString = null;
+ }
+ }
+ line = br.readLine();
+ lineNumber++;
+ }
+ return functions;
+ }
+}