You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/09/13 23:26:28 UTC
svn commit: r1384546 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/impl/streaming/ExecutableManager.java
test/org/apache/pig/impl/ test/org/apache/pig/impl/streaming/
test/org/apache/pig/impl/streaming/TestExecutableManager.java
Author: dvryaboy
Date: Thu Sep 13 21:26:28 2012
New Revision: 1384546
URL: http://svn.apache.org/viewvc?rev=1384546&view=rev
Log:
PIG-2900: Streaming should provide conf settings in the environment
Added:
pig/trunk/test/org/apache/pig/impl/
pig/trunk/test/org/apache/pig/impl/streaming/
pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1384546&r1=1384545&r2=1384546&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep 13 21:26:28 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
IMPROVEMENTS
+PIG-2900: Streaming should provide conf settings in the environment (dvryaboy)
+
PIG-2353: RANK function like in SQL (xalan via azaroth)
PIG-2915: Builtin TOP udf is sensitive to null input bags (hazen via dvryaboy)
Modified: pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1384546&r1=1384545&r2=1384546&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Thu Sep 13 21:26:28 2012
@@ -27,13 +27,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -43,11 +44,12 @@ import org.apache.pig.impl.io.BufferedPo
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.streaming.InputHandler.InputType;
import org.apache.pig.impl.streaming.OutputHandler.OutputType;
+import org.apache.pig.impl.util.UDFContext;
/**
* {@link ExecutableManager} manages an external executable which processes data
* in a Pig query.
- *
+ *
* The <code>ExecutableManager</code> is responsible for startup/teardown of
* the external process and also for managing it. It feeds input records to the
* executable via it's <code>stdin</code>, collects the output records from
@@ -61,7 +63,7 @@ public class ExecutableManager {
private static final String PATH = "PATH";
private static final String BASH = "bash";
private static final Result EOS_RESULT = new Result(POStatus.STATUS_EOS, null);
-
+
protected StreamingCommand command; // Streaming command to be run
String argvAsString; // Parsed commands
@@ -70,10 +72,10 @@ public class ExecutableManager {
protected DataOutputStream stdin; // stdin of the process
ProcessInputThread stdinThread; // thread to send input to process
-
+
ProcessOutputThread stdoutThread; // thread to get process stdout
InputStream stdout; // stdout of the process
-
+
ProcessErrorThread stderrThread; // thread to get process stderr
InputStream stderr; // stderr of the process
@@ -90,7 +92,7 @@ public class ExecutableManager {
protected volatile Throwable outerrThreadsError;
private POStream poStream;
private ProcessInputThread fileInputThread;
-
+
/**
* Create a new {@link ExecutableManager}.
*/
@@ -99,7 +101,7 @@ public class ExecutableManager {
/**
* Configure and initialize the {@link ExecutableManager}.
- *
+ *
* @param stream POStream operator
* @throws IOException
* @throws ExecException
@@ -121,7 +123,7 @@ public class ExecutableManager {
/**
* Close and cleanup the {@link ExecutableManager}.
- * @throws IOException
+ * @throws IOException
*/
public void close() throws IOException {
// Close the InputHandler, which in some cases lets the process
@@ -140,7 +142,7 @@ public class ExecutableManager {
LOG.error("Unexpected exception while waiting for streaming binary to complete", ie);
killProcess(process);
}
-
+
// Wait for stdout thread to complete
try {
if (stdoutThread != null) {
@@ -151,7 +153,7 @@ public class ExecutableManager {
LOG.error("Unexpected exception while waiting for output thread for streaming binary to complete", ie);
killProcess(process);
}
-
+
// Wait for stderr thread to complete
try {
if (stderrThread != null) {
@@ -173,12 +175,12 @@ public class ExecutableManager {
// Trigger the outputHandler
outputHandler.bindTo("", null, 0, -1);
-
+
// start thread to process output from executable's stdout
stdoutThread = new ProcessOutputThread(outputHandler, poStream);
stdoutThread.start();
}
-
+
// Check if there was a problem with the managed process
if (outerrThreadsError != null) {
LOG.error("Output/Error thread failed with: "
@@ -191,23 +193,27 @@ public class ExecutableManager {
* Helper function to close input and output streams
* to the process and kill it
* @param process the process to be killed
- * @throws IOException
+ * @throws IOException
*/
private void killProcess(Process process) throws IOException {
- inputHandler.close(process);
- outputHandler.close();
- process.destroy();
+ if (process != null) {
+ inputHandler.close(process);
+ outputHandler.close();
+ process.destroy();
+ }
}
/**
* Set up the run-time environment of the managed process.
- *
+ *
* @param pb
* {@link ProcessBuilder} used to exec the process
*/
protected void setupEnvironment(ProcessBuilder pb) {
String separator = ":";
+ Configuration conf = UDFContext.getUDFContext().getJobConf();
Map<String, String> env = pb.environment();
+ addJobConfToEnvironment(conf, env);
// Add the current-working-directory to the $PATH
File dir = pb.directory();
@@ -231,13 +237,48 @@ public class ExecutableManager {
env.put(PATH, envPath);
}
+ void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
+ Iterator<Map.Entry<String, String>> it = conf.iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, String> en = it.next();
+ String name = en.getKey();
+ //String value = (String)en.getValue(); // does not apply variable expansion
+ String value = conf.get(name); // does variable expansion
+ name = safeEnvVarName(name);
+ envPut(env, name, value);
+ }
+ }
+
+ String safeEnvVarName(String var) {
+ StringBuffer safe = new StringBuffer();
+ int len = var.length();
+ for (int i = 0; i < len; i++) {
+ char c = var.charAt(i);
+ char s;
+ if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
+ s = c;
+ } else {
+ s = '_';
+ }
+ safe.append(s);
+ }
+ return safe.toString();
+ }
+
+ void envPut(Map<String, String> env, String name, String value) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add env entry:" + name + "=" + value);
+ }
+ env.put(name, value);
+ }
+
/**
* Start execution of the external process.
- *
+ *
* This takes care of setting up the environment of the process and also
* starts ProcessErrorThread to process the <code>stderr</code> of
* the managed process.
- *
+ *
* @throws IOException
*/
protected void exec() throws IOException {
@@ -273,7 +314,7 @@ public class ExecutableManager {
// Bind the stdout to the OutputHandler
outputHandler.bindTo("", new BufferedPositionedInputStream(stdout),
0, Long.MAX_VALUE);
-
+
// start thread to process output from executable's stdout
stdoutThread = new ProcessOutputThread(outputHandler, poStream);
stdoutThread.start();
@@ -282,7 +323,7 @@ public class ExecutableManager {
/**
* Start execution of the {@link ExecutableManager}.
- *
+ *
* @throws IOException
*/
public void run() throws IOException {
@@ -291,7 +332,7 @@ public class ExecutableManager {
// start the thread to handle input
fileInputThread = new ProcessInputThread(inputHandler, poStream);
fileInputThread.start();
-
+
// If Input type is ASYNCHRONOUS that means input to the
// streaming binary is from a file - that means we cannot exec
// the process till the input file is completely written. This
@@ -326,10 +367,11 @@ public class ExecutableManager {
this.inputHandler = inputHandler;
this.poStream = poStream;
// the input queue from where this thread will read
- // input tuples
+ // input tuples
this.binaryInputQueue = poStream.getBinaryInputQueue();
}
+ @Override
public void run() {
try {
// Read tuples from the previous operator in the pipeline
@@ -339,8 +381,8 @@ public class ExecutableManager {
inp = binaryInputQueue.take();
synchronized (poStream) {
// notify waiting producer
- // the if check is to keep "findbugs"
- // happy
+ // the if check is to keep "findbugs"
+ // happy
if(inp != null)
poStream.notifyAll();
}
@@ -365,7 +407,7 @@ public class ExecutableManager {
Tuple t = null;
try {
t = (Tuple) inp.result;
- inputHandler.putNext(t);
+ inputHandler.putNext(t);
} catch (IOException e) {
// if input type is synchronous then it could
// be related to the process terminating
@@ -391,7 +433,7 @@ public class ExecutableManager {
// Generally the ProcessOutputThread would do this but now
// we should do it here since neither the process nor the
// ProcessOutputThread will ever be spawned
- Result res = new Result(POStatus.STATUS_ERR,
+ Result res = new Result(POStatus.STATUS_ERR,
"Exception while trying to write to stream binary's input" + e.getMessage());
sendOutput(poStream.getBinaryOutputQueue(), res);
throw e;
@@ -402,8 +444,8 @@ public class ExecutableManager {
}
}
} catch (Throwable t) {
-
-
+
+
// Note that an error occurred
outerrThreadsError = t;
LOG.error(t);
@@ -412,7 +454,7 @@ public class ExecutableManager {
} catch (IOException ioe) {
LOG.warn(ioe);
}
- }
+ }
}
}
@@ -430,7 +472,7 @@ public class ExecutableManager {
}
}
}
-
+
/**
* The thread which gets output from the streaming binary and puts it onto
* the binary output Queue of POStream
@@ -448,6 +490,7 @@ public class ExecutableManager {
this.binaryOutputQueue = poStream.getBinaryOutputQueue();
}
+ @Override
public void run() {
try {
// Read tuples from the executable and send it to
@@ -464,12 +507,12 @@ public class ExecutableManager {
// Note that an error occurred
outerrThreadsError = t;
LOG.error("Caught Exception in OutputHandler of Streaming binary, " +
- "sending error signal to pipeline", t);
+ "sending error signal to pipeline", t);
// send ERROR to POStream
try {
Result res = new Result();
- res.result = "Error reading output from Streaming binary:" +
- "'" + argvAsString + "':" + t.getMessage();
+ res.result = "Error reading output from Streaming binary:" +
+ "'" + argvAsString + "':" + t.getMessage();
res.returnStatus = POStatus.STATUS_ERR;
sendOutput(binaryOutputQueue, res);
killProcess(process);
@@ -481,7 +524,7 @@ public class ExecutableManager {
void processOutput(Tuple t) {
Result res = new Result();
-
+
if (t != null) {
// we have a valid tuple to pass back
res.result = t;
@@ -498,12 +541,12 @@ public class ExecutableManager {
killProcess(process);
} catch (IOException e) {
LOG.warn("Exception trying to kill process while processing null output " +
- "from binary", e);
-
+ "from binary", e);
+
}
// signal error
String errMsg = "Failure while waiting for process (" + argvAsString + ")" +
- ie.getMessage();
+ ie.getMessage();
LOG.error(errMsg, ie);
res.result = errMsg;
res.returnStatus = POStatus.STATUS_ERR;
@@ -515,27 +558,27 @@ public class ExecutableManager {
res = EOS_RESULT;
} else {
// signal Error
-
- String errMsg = "'" + argvAsString + "'" + " failed with exit status: "
- + exitCode;
+
+ String errMsg = "'" + argvAsString + "'" + " failed with exit status: "
+ + exitCode;
LOG.error(errMsg);
res.result = errMsg;
res.returnStatus = POStatus.STATUS_ERR;
}
}
sendOutput(binaryOutputQueue, res);
-
+
}
}
-
-
-
+
+
+
/**
* Workhorse to process the stderr stream of the managed process.
- *
+ *
* By default <code>ExecuatbleManager</code> just sends out the received
* error message to the <code>stderr</code> of itself.
- *
+ *
* @param error
* error message from the managed process.
*/
@@ -550,6 +593,7 @@ public class ExecutableManager {
setDaemon(true);
}
+ @Override
public void run() {
try {
String error;
Added: pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java?rev=1384546&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java (added)
+++ pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java Thu Sep 13 21:26:28 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.streaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestExecutableManager {
+
+ @Test
+ public void testSafeEnvVarName() {
+ ExecutableManager manager = new ExecutableManager();
+ assertEquals("foo", manager.safeEnvVarName("foo"));
+ assertEquals("", manager.safeEnvVarName(""));
+ assertEquals("foo_bar",manager.safeEnvVarName("foo.bar"));
+ assertEquals("foo_bar",manager.safeEnvVarName("foo$bar"));
+ assertEquals("foo_",manager.safeEnvVarName("foo "));
+ }
+
+ @Test
+ public void testAddJobConfToEnv() {
+ Configuration conf = new Configuration();
+ conf.set("foo", "bar");
+ Map<String, String> env = new HashMap<String, String>();
+ ExecutableManager manager = new ExecutableManager();
+ manager.addJobConfToEnvironment(conf, env);
+ assertTrue(env.containsKey("hadoop_tmp_dir"));
+ assertEquals("bar", env.get("foo"));
+ }
+}