You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/09/13 23:26:28 UTC

svn commit: r1384546 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/streaming/ExecutableManager.java test/org/apache/pig/impl/ test/org/apache/pig/impl/streaming/ test/org/apache/pig/impl/streaming/TestExecutableManager.java

Author: dvryaboy
Date: Thu Sep 13 21:26:28 2012
New Revision: 1384546

URL: http://svn.apache.org/viewvc?rev=1384546&view=rev
Log:
PIG-2900: Streaming should provide conf settings in the environment

Added:
    pig/trunk/test/org/apache/pig/impl/
    pig/trunk/test/org/apache/pig/impl/streaming/
    pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1384546&r1=1384545&r2=1384546&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep 13 21:26:28 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2900: Streaming should provide conf settings in the environment (dvryaboy)
+
 PIG-2353: RANK function like in SQL (xalan via azaroth)
 
 PIG-2915: Builtin TOP udf is sensitive to null input bags (hazen via dvryaboy)

Modified: pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1384546&r1=1384545&r2=1384546&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Thu Sep 13 21:26:28 2012
@@ -27,13 +27,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -43,11 +44,12 @@ import org.apache.pig.impl.io.BufferedPo
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.streaming.InputHandler.InputType;
 import org.apache.pig.impl.streaming.OutputHandler.OutputType;
+import org.apache.pig.impl.util.UDFContext;
 
 /**
  * {@link ExecutableManager} manages an external executable which processes data
  * in a Pig query.
- * 
+ *
  * The <code>ExecutableManager</code> is responsible for startup/teardown of
  * the external process and also for managing it. It feeds input records to the
  * executable via it's <code>stdin</code>, collects the output records from
@@ -61,7 +63,7 @@ public class ExecutableManager {
     private static final String PATH = "PATH";
     private static final String BASH = "bash";
     private static final Result EOS_RESULT = new Result(POStatus.STATUS_EOS, null);
-    
+
     protected StreamingCommand command; // Streaming command to be run
     String argvAsString; // Parsed commands
 
@@ -70,10 +72,10 @@ public class ExecutableManager {
 
     protected DataOutputStream stdin; // stdin of the process
     ProcessInputThread stdinThread; // thread to send input to process
-    
+
     ProcessOutputThread stdoutThread; // thread to get process stdout
     InputStream stdout; // stdout of the process
-    
+
     ProcessErrorThread stderrThread; // thread to get process stderr
     InputStream stderr; // stderr of the process
 
@@ -90,7 +92,7 @@ public class ExecutableManager {
     protected volatile Throwable outerrThreadsError;
     private POStream poStream;
     private ProcessInputThread fileInputThread;
-    
+
     /**
      * Create a new {@link ExecutableManager}.
      */
@@ -99,7 +101,7 @@ public class ExecutableManager {
 
     /**
      * Configure and initialize the {@link ExecutableManager}.
-     * 
+     *
      * @param stream POStream operator
      * @throws IOException
      * @throws ExecException
@@ -121,7 +123,7 @@ public class ExecutableManager {
 
     /**
      * Close and cleanup the {@link ExecutableManager}.
-     * @throws IOException 
+     * @throws IOException
      */
     public void close() throws IOException {
         // Close the InputHandler, which in some cases lets the process
@@ -140,7 +142,7 @@ public class ExecutableManager {
             LOG.error("Unexpected exception while waiting for streaming binary to complete", ie);
             killProcess(process);
         }
-        
+
         // Wait for stdout thread to complete
         try {
             if (stdoutThread != null) {
@@ -151,7 +153,7 @@ public class ExecutableManager {
             LOG.error("Unexpected exception while waiting for output thread for streaming binary to complete", ie);
             killProcess(process);
         }
-        
+
         // Wait for stderr thread to complete
         try {
             if (stderrThread != null) {
@@ -173,12 +175,12 @@ public class ExecutableManager {
 
             // Trigger the outputHandler
             outputHandler.bindTo("", null, 0, -1);
-            
+
             // start thread to process output from executable's stdout
             stdoutThread = new ProcessOutputThread(outputHandler, poStream);
             stdoutThread.start();
         }
-        
+
         // Check if there was a problem with the managed process
         if (outerrThreadsError != null) {
             LOG.error("Output/Error thread failed with: "
@@ -191,23 +193,27 @@ public class ExecutableManager {
      *  Helper function to close input and output streams
      *  to the process and kill it
      * @param process the process to be killed
-     * @throws IOException 
+     * @throws IOException
      */
     private void killProcess(Process process) throws IOException {
-        inputHandler.close(process);
-        outputHandler.close();
-        process.destroy();
+        if (process != null) {
+            inputHandler.close(process);
+            outputHandler.close();
+            process.destroy();
+        }
     }
 
     /**
      * Set up the run-time environment of the managed process.
-     * 
+     *
      * @param pb
      *            {@link ProcessBuilder} used to exec the process
      */
     protected void setupEnvironment(ProcessBuilder pb) {
         String separator = ":";
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
         Map<String, String> env = pb.environment();
+        addJobConfToEnvironment(conf, env);
 
         // Add the current-working-directory to the $PATH
         File dir = pb.directory();
@@ -231,13 +237,48 @@ public class ExecutableManager {
         env.put(PATH, envPath);
     }
 
+    void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
+        Iterator<Map.Entry<String, String>> it = conf.iterator();
+        while (it.hasNext()) {
+          Map.Entry<String, String> en = it.next();
+          String name = en.getKey();
+          //String value = (String)en.getValue(); // does not apply variable expansion
+          String value = conf.get(name); // does variable expansion
+          name = safeEnvVarName(name);
+          envPut(env, name, value);
+        }
+      }
+
+      String safeEnvVarName(String var) {
+        StringBuffer safe = new StringBuffer();
+        int len = var.length();
+        for (int i = 0; i < len; i++) {
+          char c = var.charAt(i);
+          char s;
+          if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
+            s = c;
+          } else {
+            s = '_';
+          }
+          safe.append(s);
+        }
+        return safe.toString();
+      }
+
+      void envPut(Map<String, String> env, String name, String value) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Add  env entry:" + name + "=" + value);
+        }
+        env.put(name, value);
+      }
+
     /**
      * Start execution of the external process.
-     * 
+     *
      * This takes care of setting up the environment of the process and also
      * starts ProcessErrorThread to process the <code>stderr</code> of
      * the managed process.
-     * 
+     *
      * @throws IOException
      */
     protected void exec() throws IOException {
@@ -273,7 +314,7 @@ public class ExecutableManager {
             // Bind the stdout to the OutputHandler
             outputHandler.bindTo("", new BufferedPositionedInputStream(stdout),
                     0, Long.MAX_VALUE);
-            
+
             // start thread to process output from executable's stdout
             stdoutThread = new ProcessOutputThread(outputHandler, poStream);
             stdoutThread.start();
@@ -282,7 +323,7 @@ public class ExecutableManager {
 
     /**
      * Start execution of the {@link ExecutableManager}.
-     * 
+     *
      * @throws IOException
      */
     public void run() throws IOException {
@@ -291,7 +332,7 @@ public class ExecutableManager {
             // start the thread to handle input
             fileInputThread = new ProcessInputThread(inputHandler, poStream);
             fileInputThread.start();
-            
+
             // If Input type is ASYNCHRONOUS that means input to the
             // streaming binary is from a file - that means we cannot exec
             // the process till the input file is completely written. This
@@ -326,10 +367,11 @@ public class ExecutableManager {
             this.inputHandler = inputHandler;
             this.poStream = poStream;
             // the input queue from where this thread will read
-            // input tuples 
+            // input tuples
             this.binaryInputQueue = poStream.getBinaryInputQueue();
         }
 
+        @Override
         public void run() {
             try {
                 // Read tuples from the previous operator in the pipeline
@@ -339,8 +381,8 @@ public class ExecutableManager {
                     inp = binaryInputQueue.take();
                     synchronized (poStream) {
                         // notify waiting producer
-                        // the if check is to keep "findbugs" 
-                        // happy 
+                        // the if check is to keep "findbugs"
+                        // happy
                         if(inp != null)
                             poStream.notifyAll();
                     }
@@ -365,7 +407,7 @@ public class ExecutableManager {
                         Tuple t = null;
                         try {
                             t = (Tuple) inp.result;
-                            inputHandler.putNext(t);                            
+                            inputHandler.putNext(t);
                         } catch (IOException e) {
                             // if input type is synchronous then it could
                             // be related to the process terminating
@@ -391,7 +433,7 @@ public class ExecutableManager {
                                 // Generally the ProcessOutputThread would do this but now
                                 // we should do it here since neither the process nor the
                                 // ProcessOutputThread will ever be spawned
-                                Result res = new Result(POStatus.STATUS_ERR, 
+                                Result res = new Result(POStatus.STATUS_ERR,
                                         "Exception while trying to write to stream binary's input" + e.getMessage());
                                 sendOutput(poStream.getBinaryOutputQueue(), res);
                                 throw e;
@@ -402,8 +444,8 @@ public class ExecutableManager {
                     }
                 }
             } catch (Throwable t) {
-                
-                
+
+
                 // Note that an error occurred
                 outerrThreadsError = t;
                 LOG.error(t);
@@ -412,7 +454,7 @@ public class ExecutableManager {
                 } catch (IOException ioe) {
                     LOG.warn(ioe);
                 }
-           }
+            }
         }
     }
 
@@ -430,7 +472,7 @@ public class ExecutableManager {
             }
         }
     }
-    
+
     /**
      * The thread which gets output from the streaming binary and puts it onto
      * the binary output Queue of POStream
@@ -448,6 +490,7 @@ public class ExecutableManager {
             this.binaryOutputQueue = poStream.getBinaryOutputQueue();
         }
 
+        @Override
         public void run() {
             try {
                 // Read tuples from the executable and send it to
@@ -464,12 +507,12 @@ public class ExecutableManager {
                 // Note that an error occurred
                 outerrThreadsError = t;
                 LOG.error("Caught Exception in OutputHandler of Streaming binary, " +
-                		"sending error signal to pipeline", t);
+                        "sending error signal to pipeline", t);
                 // send ERROR to POStream
                 try {
                     Result res = new Result();
-                    res.result = "Error reading output from Streaming binary:" + 
-                    "'" + argvAsString + "':" + t.getMessage();
+                    res.result = "Error reading output from Streaming binary:" +
+                            "'" + argvAsString + "':" + t.getMessage();
                     res.returnStatus = POStatus.STATUS_ERR;
                     sendOutput(binaryOutputQueue, res);
                     killProcess(process);
@@ -481,7 +524,7 @@ public class ExecutableManager {
 
         void processOutput(Tuple t) {
             Result res = new Result();
-            
+
             if (t != null) {
                 // we have a valid tuple to pass back
                 res.result = t;
@@ -498,12 +541,12 @@ public class ExecutableManager {
                         killProcess(process);
                     } catch (IOException e) {
                         LOG.warn("Exception trying to kill process while processing null output " +
-                        		"from binary", e);
-                        
+                                "from binary", e);
+
                     }
                     // signal error
                     String errMsg = "Failure while waiting for process (" + argvAsString + ")" +
-                    ie.getMessage(); 
+                            ie.getMessage();
                     LOG.error(errMsg, ie);
                     res.result = errMsg;
                     res.returnStatus = POStatus.STATUS_ERR;
@@ -515,27 +558,27 @@ public class ExecutableManager {
                     res = EOS_RESULT;
                 } else {
                     // signal Error
-                    
-                    String errMsg = "'" + argvAsString + "'" + " failed with exit status: " 
-                    + exitCode; 
+
+                    String errMsg = "'" + argvAsString + "'" + " failed with exit status: "
+                            + exitCode;
                     LOG.error(errMsg);
                     res.result = errMsg;
                     res.returnStatus = POStatus.STATUS_ERR;
                 }
             }
             sendOutput(binaryOutputQueue, res);
-            
+
         }
     }
-    
-    
-    
+
+
+
     /**
      * Workhorse to process the stderr stream of the managed process.
-     * 
+     *
      * By default <code>ExecuatbleManager</code> just sends out the received
      * error message to the <code>stderr</code> of itself.
-     * 
+     *
      * @param error
      *            error message from the managed process.
      */
@@ -550,6 +593,7 @@ public class ExecutableManager {
             setDaemon(true);
         }
 
+        @Override
         public void run() {
             try {
                 String error;

Added: pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java?rev=1384546&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java (added)
+++ pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java Thu Sep 13 21:26:28 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.streaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestExecutableManager {
+
+    @Test
+    public void testSafeEnvVarName() {
+        ExecutableManager manager = new ExecutableManager();
+        assertEquals("foo", manager.safeEnvVarName("foo"));
+        assertEquals("", manager.safeEnvVarName(""));
+        assertEquals("foo_bar",manager.safeEnvVarName("foo.bar"));
+        assertEquals("foo_bar",manager.safeEnvVarName("foo$bar"));
+        assertEquals("foo_",manager.safeEnvVarName("foo "));
+    }
+
+    @Test
+    public void testAddJobConfToEnv() {
+        Configuration conf = new Configuration();
+        conf.set("foo", "bar");
+        Map<String, String> env = new HashMap<String, String>();
+        ExecutableManager manager = new ExecutableManager();
+        manager.addJobConfToEnvironment(conf, env);
+        assertTrue(env.containsKey("hadoop_tmp_dir"));
+        assertEquals("bar", env.get("foo"));
+    }
+}