You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/05/07 19:36:00 UTC
svn commit: r654188 - in /incubator/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
src/org/apache/pig/impl/eval/StreamSpec.java
test/org/apache/pig/test/TestStreaming.java
Author: olga
Date: Wed May 7 10:35:55 2008
New Revision: 654188
URL: http://svn.apache.org/viewvc?rev=654188&view=rev
Log:
PIG-230: support for shipping for multuiple commands
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=654188&r1=654187&r2=654188&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Wed May 7 10:35:55 2008
@@ -275,6 +275,9 @@
PIG-229: Proper error handling in case of deserializer failure
+ PIG-230: Handling shipment for multiple ship/cache commands (acmurthy via
+ olgan)
+
PIG-219: Change unit tests to run both local and map reduce modes (kali via gates).
PIG-202: Fix Order by so that user provided comparator func is used for quantile determination (kali via gates).
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=654188&r1=654187&r2=654188&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Wed May 7 10:35:55 2008
@@ -63,6 +63,14 @@
public int mapParallelism = -1; // -1 means let hadoop decide
public int reduceParallelism = -1;
+ /**
+ * A list of configs to be merged, not overwritten ...
+ */
+ private static String[] PIG_CONFIGS_TO_MERGE =
+ {
+ "pig.streaming.cache.files",
+ "pig.streaming.ship.files",
+ };
static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher();
@@ -158,7 +166,7 @@
public void addInputFile(FileSpec fileSpec, EvalSpec evalSpec){
inputFileSpecs.add(fileSpec);
toMap.add(evalSpec);
- properties.putAll(evalSpec.getProperties());
+ mergeProperties(evalSpec.getProperties());
}
@@ -249,7 +257,7 @@
else
toMap.set(i, toMap.get(i).addSpec(spec));
- properties.putAll(spec.getProperties());
+ mergeProperties(spec.getProperties());
}
public void addReduceSpec(EvalSpec spec){
@@ -258,7 +266,7 @@
else
toReduce = toReduce.addSpec(spec);
- properties.putAll(spec.getProperties());
+ mergeProperties(spec.getProperties());
}
public void setProperty(String key, String value) {
@@ -272,6 +280,27 @@
public void visit(POVisitor v) {
v.visitMapreduce(this);
}
+
+ // TODO: Ugly hack! Need a better way to manage multiple properties
+ // Presumably it should be a part of Hadoop Configuration.
+ private void mergeProperties(Properties other) {
+ Properties mergedProperties = new Properties();
+
+ for (String key : PIG_CONFIGS_TO_MERGE) {
+ String value = properties.getProperty(key);
+ String otherValue = other.getProperty(key);
+
+ if (value != null && otherValue != null) {
+ mergedProperties.setProperty(key, value + ", " + otherValue);
+ }
+ }
+
+ // Copy the other one
+ properties.putAll(other);
+
+ // Now, overwrite with the merged one
+ properties.putAll(mergedProperties);
+ }
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=654188&r1=654187&r2=654188&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Wed May 7 10:35:55 2008
@@ -57,6 +57,10 @@
private static void parseShipCacheSpecs(List<String> specs,
Properties properties, String property) {
+ if (specs == null || specs.size() == 0) {
+ return;
+ }
+
// Setup streaming-specific properties
StringBuffer sb = new StringBuffer();
Iterator<String> i = specs.iterator();
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=654188&r1=654187&r2=654188&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Wed May 7 10:35:55 2008
@@ -202,7 +202,8 @@
" print STDERR \"STDERR: $_\n\";",
"}",
};
- File command = Util.createInputFile("script", "pl", script);
+ File command1 = Util.createInputFile("script", "pl", script);
+ File command2 = Util.createInputFile("script", "pl", script);
// Expected results
String[] expectedFirstFields =
@@ -213,15 +214,21 @@
// Pig query to run
pigServer.registerQuery(
- "define CMD `" + command.getName() + " foo` " +
- "ship ('" + command + "') " +
+ "define CMD1 `" + command1.getName() + " foo` " +
+ "ship ('" + command1 + "') " +
"input('foo' using " + PigStorage.class.getName() + "(',')) " +
"stderr();");
-
+ pigServer.registerQuery(
+ "define CMD2 `" + command2.getName() + " bar` " +
+ "ship ('" + command2 + "') " +
+ "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
pigServer.registerQuery("IP = load 'file:" + input + "' using " +
PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
- pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+ pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+ "through CMD1;");
+ pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
String output = "/pig/out";
pigServer.deleteFile(output);