You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/10/15 22:49:51 UTC

svn commit: r1398507 - in /pig/trunk: CHANGES.txt src/org/apache/pig/PigConfiguration.java src/org/apache/pig/impl/streaming/ExecutableManager.java test/org/apache/pig/impl/streaming/TestExecutableManager.java

Author: jcoveney
Date: Mon Oct 15 20:49:50 2012
New Revision: 1398507

URL: http://svn.apache.org/viewvc?rev=1398507&view=rev
Log:
PIG-2971: Add new parameter to specify the streaming environment (jcoveney)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
    pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1398507&r1=1398506&r2=1398507&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct 15 20:49:50 2012
@@ -307,6 +307,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-2971: Add new parameter to specify the streaming environment (jcoveney)
+
 PIG-2963: Illustrate command and POPackageLite (cheolsoo via jcoveney)
 
 PIG-2961: BinInterSedesRawComparator broken by TUPLE_number patch (jcoveney)

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1398507&r1=1398506&r2=1398507&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Oct 15 20:49:50 2012
@@ -74,4 +74,11 @@ public class PigConfiguration {
      * Turns off use of combiners in MapReduce jobs produced by Pig.
      */
     public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";
+
+    /**
+     * This key can be used to defined what properties will be set in the streaming environment.
+     * Just set this property to a comma-delimited list of properties to set, and those properties
+     * will be set in the environment.
+     */
+    public static final String PIG_STREAMING_ENVIRONMENT = "pig.streaming.environment";
 }

Modified: pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1398507&r1=1398506&r2=1398507&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Mon Oct 15 20:49:50 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.impl.streaming;
 
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
@@ -27,7 +29,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -57,8 +58,7 @@ import org.apache.pig.impl.util.UDFConte
  * <code>stdout</code>.
  */
 public class ExecutableManager {
-    private static final Log LOG = LogFactory.getLog(ExecutableManager.class
-            .getName());
+    private static final Log LOG = LogFactory.getLog(ExecutableManager.class);
     private static final int SUCCESS = 0;
     private static final String PATH = "PATH";
     private static final String BASH = "bash";
@@ -238,14 +238,21 @@ public class ExecutableManager {
     }
 
     void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
-        Iterator<Map.Entry<String, String>> it = conf.iterator();
-        while (it.hasNext()) {
-          Map.Entry<String, String> en = it.next();
-          String name = en.getKey();
-          //String value = (String)en.getValue(); // does not apply variable expansion
-          String value = conf.get(name); // does variable expansion
-          name = safeEnvVarName(name);
-          envPut(env, name, value);
+        String propsToSend = conf.get(PIG_STREAMING_ENVIRONMENT);
+        LOG.debug("Properties to ship to streaming environment set in "+PIG_STREAMING_ENVIRONMENT+": " + propsToSend);
+        if (propsToSend == null) {
+            return;
+        }
+
+        for (String prop : propsToSend.split(",")) {
+            String value = conf.get(prop);
+            if (value == null) {
+                LOG.warn("Property set in "+PIG_STREAMING_ENVIRONMENT+" not found in Configuration: " + prop);
+                continue;
+            }
+            LOG.debug("Setting property in streaming environment: " + prop);
+            envPut(env, prop, value);
+
         }
       }
 

Modified: pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java?rev=1398507&r1=1398506&r2=1398507&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java (original)
+++ pig/trunk/test/org/apache/pig/impl/streaming/TestExecutableManager.java Mon Oct 15 20:49:50 2012
@@ -18,16 +18,22 @@
 
 package org.apache.pig.impl.streaming;
 
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Random;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
+
 public class TestExecutableManager {
+    private static final Random r = new Random(100L);
 
     @Test
     public void testSafeEnvVarName() {
@@ -41,12 +47,40 @@ public class TestExecutableManager {
 
     @Test
     public void testAddJobConfToEnv() {
+        StringBuilder streamingEnv = null;
         Configuration conf = new Configuration();
-        conf.set("foo", "bar");
+        Map<String, String> all = Maps.newHashMap();
+        for (int i = 0; i < 10000; i++) {
+            String key = RandomStringUtils.random(10);
+            String value = RandomStringUtils.random(10);
+            all.put(key, value);
+        }
+        Map<String, String> toInclude = Maps.newHashMap();
+        for (Map.Entry<String, String> entry : all.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            conf.set(key, value);
+            if (r.nextDouble() < .5) {
+                toInclude.put(key, value);
+                if (streamingEnv == null) {
+                    streamingEnv = new StringBuilder();
+                } else {
+                    streamingEnv.append(",");
+                }
+                streamingEnv.append(key);
+            }
+        }
+        conf.set(PIG_STREAMING_ENVIRONMENT, streamingEnv.toString());
         Map<String, String> env = new HashMap<String, String>();
         ExecutableManager manager = new ExecutableManager();
         manager.addJobConfToEnvironment(conf, env);
-        assertTrue(env.containsKey("hadoop_tmp_dir"));
-        assertEquals("bar", env.get("foo"));
+
+        for (Map.Entry<String, String> entry : env.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            String value2 = toInclude.remove(key);
+            assertEquals("Key ["+key+"] should be set in environment", value, value2);
+        }
+        assertTrue("There should be no remaining pairs in the included map",toInclude.isEmpty());
     }
 }