You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/10/15 22:50:29 UTC
svn commit: r1398508 - in /pig/branches/branch-0.11: CHANGES.txt
src/org/apache/pig/PigConfiguration.java
src/org/apache/pig/impl/streaming/ExecutableManager.java
test/org/apache/pig/impl/streaming/TestExecutableManager.java
Author: jcoveney
Date: Mon Oct 15 20:50:29 2012
New Revision: 1398508
URL: http://svn.apache.org/viewvc?rev=1398508&view=rev
Log:
PIG-2971: Add new parameter to specify the streaming environment (jcoveney)
Modified:
pig/branches/branch-0.11/CHANGES.txt
pig/branches/branch-0.11/src/org/apache/pig/PigConfiguration.java
pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java
pig/branches/branch-0.11/test/org/apache/pig/impl/streaming/TestExecutableManager.java
Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1398508&r1=1398507&r2=1398508&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Mon Oct 15 20:50:29 2012
@@ -295,6 +295,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-2971: Add new parameter to specify the streaming environment (jcoveney)
+
PIG-2963: Illustrate command and POPackageLite (cheolsoo via jcoveney)
PIG-2961: BinInterSedesRawComparator broken by TUPLE_number patch (jcoveney)
Modified: pig/branches/branch-0.11/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/PigConfiguration.java?rev=1398508&r1=1398507&r2=1398508&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/PigConfiguration.java Mon Oct 15 20:50:29 2012
@@ -74,4 +74,11 @@ public class PigConfiguration {
* Turns off use of combiners in MapReduce jobs produced by Pig.
*/
public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";
+
+ /**
+ * This key can be used to defined what properties will be set in the streaming environment.
+ * Just set this property to a comma-delimited list of properties to set, and those properties
+ * will be set in the environment.
+ */
+ public static final String PIG_STREAMING_ENVIRONMENT = "pig.streaming.environment";
}
Modified: pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1398508&r1=1398507&r2=1398508&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/impl/streaming/ExecutableManager.java Mon Oct 15 20:50:29 2012
@@ -17,6 +17,8 @@
*/
package org.apache.pig.impl.streaming;
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
@@ -27,7 +29,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -57,8 +58,7 @@ import org.apache.pig.impl.util.UDFConte
* <code>stdout</code>.
*/
public class ExecutableManager {
- private static final Log LOG = LogFactory.getLog(ExecutableManager.class
- .getName());
+ private static final Log LOG = LogFactory.getLog(ExecutableManager.class);
private static final int SUCCESS = 0;
private static final String PATH = "PATH";
private static final String BASH = "bash";
@@ -238,14 +238,21 @@ public class ExecutableManager {
}
void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
- Iterator<Map.Entry<String, String>> it = conf.iterator();
- while (it.hasNext()) {
- Map.Entry<String, String> en = it.next();
- String name = en.getKey();
- //String value = (String)en.getValue(); // does not apply variable expansion
- String value = conf.get(name); // does variable expansion
- name = safeEnvVarName(name);
- envPut(env, name, value);
+ String propsToSend = conf.get(PIG_STREAMING_ENVIRONMENT);
+ LOG.debug("Properties to ship to streaming environment set in "+PIG_STREAMING_ENVIRONMENT+": " + propsToSend);
+ if (propsToSend == null) {
+ return;
+ }
+
+ for (String prop : propsToSend.split(",")) {
+ String value = conf.get(prop);
+ if (value == null) {
+ LOG.warn("Property set in "+PIG_STREAMING_ENVIRONMENT+" not found in Configuration: " + prop);
+ continue;
+ }
+ LOG.debug("Setting property in streaming environment: " + prop);
+ envPut(env, prop, value);
+
}
}
Modified: pig/branches/branch-0.11/test/org/apache/pig/impl/streaming/TestExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/impl/streaming/TestExecutableManager.java?rev=1398508&r1=1398507&r2=1398508&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/impl/streaming/TestExecutableManager.java (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/impl/streaming/TestExecutableManager.java Mon Oct 15 20:50:29 2012
@@ -18,16 +18,22 @@
package org.apache.pig.impl.streaming;
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
+import java.util.Random;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import com.google.common.collect.Maps;
+
public class TestExecutableManager {
+ private static final Random r = new Random(100L);
@Test
public void testSafeEnvVarName() {
@@ -41,12 +47,40 @@ public class TestExecutableManager {
@Test
public void testAddJobConfToEnv() {
+ StringBuilder streamingEnv = null;
Configuration conf = new Configuration();
- conf.set("foo", "bar");
+ Map<String, String> all = Maps.newHashMap();
+ for (int i = 0; i < 10000; i++) {
+ String key = RandomStringUtils.random(10);
+ String value = RandomStringUtils.random(10);
+ all.put(key, value);
+ }
+ Map<String, String> toInclude = Maps.newHashMap();
+ for (Map.Entry<String, String> entry : all.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ conf.set(key, value);
+ if (r.nextDouble() < .5) {
+ toInclude.put(key, value);
+ if (streamingEnv == null) {
+ streamingEnv = new StringBuilder();
+ } else {
+ streamingEnv.append(",");
+ }
+ streamingEnv.append(key);
+ }
+ }
+ conf.set(PIG_STREAMING_ENVIRONMENT, streamingEnv.toString());
Map<String, String> env = new HashMap<String, String>();
ExecutableManager manager = new ExecutableManager();
manager.addJobConfToEnvironment(conf, env);
- assertTrue(env.containsKey("hadoop_tmp_dir"));
- assertEquals("bar", env.get("foo"));
+
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ String value2 = toInclude.remove(key);
+ assertEquals("Key ["+key+"] should be set in environment", value, value2);
+ }
+ assertTrue("There should be no remaining pairs in the included map",toInclude.isEmpty());
}
}