You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2012/10/31 18:48:59 UTC

svn commit: r1404269 - in /pig/branches/branch-0.11: CHANGES.txt src/org/apache/pig/impl/streaming/ExecutableManager.java test/org/apache/pig/test/TestStreaming.java

Author: cheolsoo
Date: Wed Oct 31 17:48:59 2012
New Revision: 1404269

URL: http://svn.apache.org/viewvc?rev=1404269&view=rev
Log:
PIG-2973: TestStreaming test times out (cheolsoo)

Modified:
    pig/branches/branch-0.11/CHANGES.txt
    pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java
    pig/branches/branch-0.11/test/org/apache/pig/test/TestStreaming.java

Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1404269&r1=1404268&r2=1404269&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Wed Oct 31 17:48:59 2012
@@ -310,6 +310,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-2973: TestStreaming test times out (cheolsoo)
+
 PIG-3001: TestExecutableManager.testAddJobConfToEnv fails randomly (cheolsoo)
 
 PIG-3017: Pig's object serialization should use compression (jcoveney)

Modified: pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1404269&r1=1404268&r2=1404269&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java Wed Oct 31 17:48:59 2012
@@ -319,8 +319,12 @@ public class ExecutableManager {
     public void run() throws IOException {
         // Check if we need to exec the process NOW ...
         if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
-            // start the thread to handle input
-            fileInputThread = new ProcessInputThread(inputHandler, poStream);
+            // start the thread to handle input. we pass the UDFContext to the
+            // fileInputThread because when input type is asynchronous, the
+            // exec() is called by fileInputThread, and it needs to access to
+            // the UDFContext.
+            fileInputThread = new ProcessInputThread(
+                    inputHandler, poStream, UDFContext.getUDFContext());
             fileInputThread.start();
 
             // If Input type is ASYNCHRONOUS that means input to the
@@ -338,7 +342,7 @@ public class ExecutableManager {
         inputHandler.bindTo(stdin);
 
         // Start the thread to send input to the executable's stdin
-        stdinThread = new ProcessInputThread(inputHandler, poStream);
+        stdinThread = new ProcessInputThread(inputHandler, poStream, null);
         stdinThread.start();
     }
 
@@ -350,12 +354,15 @@ public class ExecutableManager {
 
         InputHandler inputHandler;
         private POStream poStream;
+        private UDFContext udfContext;
         private BlockingQueue<Result> binaryInputQueue;
 
-        ProcessInputThread(InputHandler inputHandler, POStream poStream) {
+        ProcessInputThread(InputHandler inputHandler, POStream poStream, UDFContext udfContext) {
             setDaemon(true);
             this.inputHandler = inputHandler;
             this.poStream = poStream;
+            // a copy of UDFContext passed from the ExecutableManager thread
+            this.udfContext = udfContext;
             // the input queue from where this thread will read
             // input tuples
             this.binaryInputQueue = poStream.getBinaryInputQueue();
@@ -363,6 +370,13 @@ public class ExecutableManager {
 
         @Override
         public void run() {
+            // If input type is asynchronous, set the udfContext of the current
+            // thread to the copy of ExecutableManager thread's udfContext. This
+            // is necessary because the exec() method is called by the current
+            // thread (fileInputThread) instead of the ExecutableManager thread.
+            if (inputHandler.getInputType() == InputType.ASYNCHRONOUS && udfContext != null) {
+                UDFContext.setUdfContext(udfContext);
+            }
             try {
                 // Read tuples from the previous operator in the pipeline
                 // and pass it to the executable

Modified: pig/branches/branch-0.11/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/TestStreaming.java?rev=1404269&r1=1404268&r2=1404269&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/TestStreaming.java (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/TestStreaming.java Wed Oct 31 17:48:59 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
@@ -40,6 +41,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
+
 public class TestStreaming {
 
     private static final MiniCluster cluster = MiniCluster.buildCluster();
@@ -803,6 +806,77 @@ public class TestStreaming {
         Assert.assertFalse(Util.exists(pig.getPigContext(), "output_dir_001/_logs/mycmd"));
 
     }
+
+    /**
+     * PIG-2973: Verify that JobConf is added to environment even when input to
+     * the streaming binary is asynchronous (i.e. it is from a file).
+     */
+    @Test
+    public void testAddJobConfToEnvironmentWithASynchInput() throws Exception {
+        File input = Util.createInputFile("tmp", "", new String[] {"A"});
+
+        // Generate a random number that will be passed via an environment
+        // variable to the streaming process
+        Random rand = new Random();
+        final int ENV_VAR_VALUE = rand.nextInt();
+        final String ENV_VAR_NAME = "MY_RANDOM_NUMBER";
+
+        // Perl script
+        String[] script =
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          // Append the value of the environment variable to the line
+                          "  print STDOUT \"$_,$ENV{'" + ENV_VAR_NAME + "'}\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "}",
+                         };
+        File command = Util.createInputFile("script", "pl", script);
+
+        // Expected results
+        String[] expectedFirstFields = new String[] {"A"};
+        Integer[] expectedSecondFields = new Integer[] {ENV_VAR_VALUE};
+        Tuple[] expectedResults =
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+                                     Util.toDataByteArrays(expectedSecondFields));
+
+        // Set a property and pass it via environment variable to the streaming process
+        pigServer.getPigContext().getProperties()
+                 .setProperty(PIG_STREAMING_ENVIRONMENT, ENV_VAR_NAME);
+        pigServer.getPigContext().getProperties()
+                 .setProperty(ENV_VAR_NAME, Integer.toString(ENV_VAR_VALUE));
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define CMD `" + command.getName() + " foo` " +
+                "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+                "input('foo' using " + PigStreaming.class.getName() + "()) " +
+                "output(stdout using " + PigStreaming.class.getName() + "(',')) " +
+                "stderr();");
+        pigServer.registerQuery("IP = load '"
+                + Util.generateURI(Util.encodeEscape(input.toString()),
+                        pigServer.getPigContext())
+                + "' using PigStorage();");
+        pigServer.registerQuery("STREAMED_DATA = stream IP through CMD;");
+
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("STREAMED_DATA", output, PigStorage.class.getName() + "(',')");
+
+        pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');");
+        Iterator<Tuple> iter = pigServer.openIterator("A");
+
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        while (iter.hasNext()) {
+            outputs.add(iter.next());
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+
     public static class PigStreamDump implements PigToStream {
 
         public static final String recordDelimiter = "\n";