You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/05/07 19:36:00 UTC

svn commit: r654188 - in /incubator/pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java src/org/apache/pig/impl/eval/StreamSpec.java test/org/apache/pig/test/TestStreaming.java

Author: olga
Date: Wed May  7 10:35:55 2008
New Revision: 654188

URL: http://svn.apache.org/viewvc?rev=654188&view=rev
Log:
PIG-230: support for shipping for multuiple commands

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=654188&r1=654187&r2=654188&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Wed May  7 10:35:55 2008
@@ -275,6 +275,9 @@
     
     PIG-229: Proper error handling in case of deserializer failure
 
+    PIG-230: Handling shipment for multiple ship/cache commands (acmurthy via
+    olgan)
+
 	PIG-219: Change unit tests to run both local and map reduce modes (kali via gates).
 
 	PIG-202: Fix Order by so that user provided comparator func is used for quantile determination (kali via gates).

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=654188&r1=654187&r2=654188&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Wed May  7 10:35:55 2008
@@ -63,6 +63,14 @@
     public int                     mapParallelism       = -1;     // -1 means let hadoop decide
     public int                     reduceParallelism    = -1;
 
+    /**
+     * A list of configs to be merged, not overwritten ...
+     */
+    private static String[] PIG_CONFIGS_TO_MERGE = 
+        { 
+            "pig.streaming.cache.files",
+            "pig.streaming.ship.files",
+        };
     
     static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher();
 
@@ -158,7 +166,7 @@
     public void addInputFile(FileSpec fileSpec, EvalSpec evalSpec){
         inputFileSpecs.add(fileSpec);
         toMap.add(evalSpec);
-        properties.putAll(evalSpec.getProperties());
+        mergeProperties(evalSpec.getProperties());
     }
     
     
@@ -249,7 +257,7 @@
         else
             toMap.set(i, toMap.get(i).addSpec(spec));
         
-        properties.putAll(spec.getProperties());
+        mergeProperties(spec.getProperties());
     }
     
     public void addReduceSpec(EvalSpec spec){
@@ -258,7 +266,7 @@
         else
             toReduce = toReduce.addSpec(spec);
         
-        properties.putAll(spec.getProperties());
+        mergeProperties(spec.getProperties());
     }
     
     public void setProperty(String key, String value) {
@@ -272,6 +280,27 @@
     public void visit(POVisitor v) {
         v.visitMapreduce(this);
     }
+    
+    // TODO: Ugly hack! Need a better way to manage multiple properties 
+    // Presumably it should be a part of Hadoop Configuration.
+    private void mergeProperties(Properties other) {
+        Properties mergedProperties = new Properties();
+        
+        for (String key : PIG_CONFIGS_TO_MERGE) {
+            String value = properties.getProperty(key);
+            String otherValue = other.getProperty(key);
+            
+            if (value != null && otherValue != null) {
+                mergedProperties.setProperty(key, value + ", " + otherValue);
+            }
+        }
+        
+        // Copy the other one
+        properties.putAll(other);
+        
+        // Now, overwrite with the merged one
+        properties.putAll(mergedProperties);
+    }
 }
 
 

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=654188&r1=654187&r2=654188&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Wed May  7 10:35:55 2008
@@ -57,6 +57,10 @@
     
     private static void parseShipCacheSpecs(List<String> specs, 
             Properties properties, String property) {
+        if (specs == null || specs.size() == 0) {
+            return;
+        }
+        
         // Setup streaming-specific properties
         StringBuffer sb = new StringBuffer();
         Iterator<String> i = specs.iterator();

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=654188&r1=654187&r2=654188&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Wed May  7 10:35:55 2008
@@ -202,7 +202,8 @@
                           "  print STDERR \"STDERR: $_\n\";",
                           "}",
                          };
-        File command = Util.createInputFile("script", "pl", script);
+        File command1 = Util.createInputFile("script", "pl", script);
+        File command2 = Util.createInputFile("script", "pl", script);
         
         // Expected results
         String[] expectedFirstFields = 
@@ -213,15 +214,21 @@
 
         // Pig query to run
         pigServer.registerQuery(
-                "define CMD `" + command.getName() + " foo` " +
-                "ship ('" + command + "') " +
+                "define CMD1 `" + command1.getName() + " foo` " +
+                "ship ('" + command1 + "') " +
                 "input('foo' using " + PigStorage.class.getName() + "(',')) " +
                 "stderr();"); 
-
+        pigServer.registerQuery(
+                "define CMD2 `" + command2.getName() + " bar` " +
+                "ship ('" + command2 + "') " +
+                "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
         pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
                                 PigStorage.class.getName() + "(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
-        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+        pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+        		                "through CMD1;");
+        pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
 
         String output = "/pig/out";
         pigServer.deleteFile(output);