You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2012/10/31 18:51:55 UTC
svn commit: r1404274 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/impl/streaming/ExecutableManager.java
test/org/apache/pig/test/TestStreaming.java
Author: cheolsoo
Date: Wed Oct 31 17:51:54 2012
New Revision: 1404274
URL: http://svn.apache.org/viewvc?rev=1404274&view=rev
Log:
PIG-2973: TestStreaming test times out (cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1404274&r1=1404273&r2=1404274&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct 31 17:51:54 2012
@@ -336,6 +336,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-2973: TestStreaming test times out (cheolsoo)
+
PIG-3001: TestExecutableManager.testAddJobConfToEnv fails randomly (cheolsoo)
PIG-3017: Pig's object serialization should use compression (jcoveney)
Modified: pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1404274&r1=1404273&r2=1404274&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Wed Oct 31 17:51:54 2012
@@ -319,8 +319,12 @@ public class ExecutableManager {
public void run() throws IOException {
// Check if we need to exec the process NOW ...
if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
- // start the thread to handle input
- fileInputThread = new ProcessInputThread(inputHandler, poStream);
+ // start the thread to handle input. we pass the UDFContext to the
+ // fileInputThread because when input type is asynchronous, the
+ // exec() is called by fileInputThread, and it needs to access to
+ // the UDFContext.
+ fileInputThread = new ProcessInputThread(
+ inputHandler, poStream, UDFContext.getUDFContext());
fileInputThread.start();
// If Input type is ASYNCHRONOUS that means input to the
@@ -338,7 +342,7 @@ public class ExecutableManager {
inputHandler.bindTo(stdin);
// Start the thread to send input to the executable's stdin
- stdinThread = new ProcessInputThread(inputHandler, poStream);
+ stdinThread = new ProcessInputThread(inputHandler, poStream, null);
stdinThread.start();
}
@@ -350,12 +354,15 @@ public class ExecutableManager {
InputHandler inputHandler;
private POStream poStream;
+ private UDFContext udfContext;
private BlockingQueue<Result> binaryInputQueue;
- ProcessInputThread(InputHandler inputHandler, POStream poStream) {
+ ProcessInputThread(InputHandler inputHandler, POStream poStream, UDFContext udfContext) {
setDaemon(true);
this.inputHandler = inputHandler;
this.poStream = poStream;
+ // a copy of UDFContext passed from the ExecutableManager thread
+ this.udfContext = udfContext;
// the input queue from where this thread will read
// input tuples
this.binaryInputQueue = poStream.getBinaryInputQueue();
@@ -363,6 +370,13 @@ public class ExecutableManager {
@Override
public void run() {
+ // If input type is asynchronous, set the udfContext of the current
+ // thread to the copy of ExecutableManager thread's udfContext. This
+ // is necessary because the exec() method is called by the current
+ // thread (fileInputThread) instead of the ExecutableManager thread.
+ if (inputHandler.getInputType() == InputType.ASYNCHRONOUS && udfContext != null) {
+ UDFContext.setUdfContext(udfContext);
+ }
try {
// Read tuples from the previous operator in the pipeline
// and pass it to the executable
Modified: pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=1404274&r1=1404273&r2=1404274&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStreaming.java Wed Oct 31 17:51:54 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Random;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
@@ -40,6 +41,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
+
public class TestStreaming {
private static final MiniCluster cluster = MiniCluster.buildCluster();
@@ -803,6 +806,77 @@ public class TestStreaming {
Assert.assertFalse(Util.exists(pig.getPigContext(), "output_dir_001/_logs/mycmd"));
}
+
+ /**
+ * PIG-2973: Verify that JobConf is added to environment even when input to
+ * the streaming binary is asynchronous (i.e. it is from a file).
+ */
+ @Test
+ public void testAddJobConfToEnvironmentWithASynchInput() throws Exception {
+ File input = Util.createInputFile("tmp", "", new String[] {"A"});
+
+ // Generate a random number that will be passed via an environment
+ // variable to the streaming process
+ Random rand = new Random();
+ final int ENV_VAR_VALUE = rand.nextInt();
+ final String ENV_VAR_NAME = "MY_RANDOM_NUMBER";
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+ "while (<INFILE>) {",
+ " chomp $_;",
+ // Append the value of the environment variable to the line
+ " print STDOUT \"$_,$ENV{'" + ENV_VAR_NAME + "'}\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ "}",
+ };
+ File command = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields = new String[] {"A"};
+ Integer[] expectedSecondFields = new Integer[] {ENV_VAR_VALUE};
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ Util.toDataByteArrays(expectedSecondFields));
+
+ // Set a property and pass it via environment variable to the streaming process
+ pigServer.getPigContext().getProperties()
+ .setProperty(PIG_STREAMING_ENVIRONMENT, ENV_VAR_NAME);
+ pigServer.getPigContext().getProperties()
+ .setProperty(ENV_VAR_NAME, Integer.toString(ENV_VAR_VALUE));
+
+ // Pig query to run
+ pigServer.registerQuery(
+ "define CMD `" + command.getName() + " foo` " +
+ "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+ "input('foo' using " + PigStreaming.class.getName() + "()) " +
+ "output(stdout using " + PigStreaming.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load '"
+ + Util.generateURI(Util.encodeEscape(input.toString()),
+ pigServer.getPigContext())
+ + "' using PigStorage();");
+ pigServer.registerQuery("STREAMED_DATA = stream IP through CMD;");
+
+ String output = "/pig/out";
+ pigServer.deleteFile(output);
+ pigServer.store("STREAMED_DATA", output, PigStorage.class.getName() + "(',')");
+
+ pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');");
+ Iterator<Tuple> iter = pigServer.openIterator("A");
+
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ while (iter.hasNext()) {
+ outputs.add(iter.next());
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+ }
+
public static class PigStreamDump implements PigToStream {
public static final String recordDelimiter = "\n";