You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/02/20 03:26:04 UTC

svn commit: r912064 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: olga
Date: Sat Feb 20 02:26:04 2010
New Revision: 912064

URL: http://svn.apache.org/viewvc?rev=912064&view=rev
Log:
PIG-1241: Accumulator is turned on when a map is used with a non-accumulative
UDF (yinghe vi olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sat Feb 20 02:26:04 2010
@@ -130,6 +130,9 @@
 
 BUG FIXES
 
+PIG-1241: Accumulator is turned on when a map is used with a non-accumulative
+UDF (yinghe vi olgan)
+
 PIG-1215: Make Hadoop jobId more prominent in the client log (ashutoshc)
 
 PIG-1216: New load store design does not allow Pig to validate inputs and

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Sat Feb 20 02:26:04 2010
@@ -155,7 +155,7 @@
         }
         
         if (po instanceof POMapLookUp) {
-            return true;
+            return check(po.getInputs().get(0));
         }
         
         if (po instanceof POProject) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Sat Feb 20 02:26:04 2010
@@ -457,9 +457,12 @@
         EndOfAllInputSetter checker = new EndOfAllInputSetter(plan);
         checker.visit();
         
-        AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
-        accum.visit();
-        
+        boolean isAccum = 
+            "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.accumulator","true"));
+        if (isAccum) {
+            AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+            accum.visit();
+        }
         return plan;
     }
     

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Sat Feb 20 02:26:04 2010
@@ -33,6 +33,7 @@
     private static final String INPUT_FILE = "AccumulatorInput.txt";
     private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
     private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
+    private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
  
     private PigServer pigServer;
     private MiniCluster cluster = MiniCluster.buildCluster();
@@ -50,6 +51,7 @@
     
     @Before
     public void setUp() throws Exception {
+        pigServer.getPigContext().getProperties().remove("opt.accumulator");
         createFiles();
     }
 
@@ -94,6 +96,16 @@
         w.close();   
         
         Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+        
+        w = new PrintWriter(new FileWriter(INPUT_FILE4));
+        
+        w.println("100\thttp://ibm.com,ibm");    	
+        w.println("100\thttp://ibm.com,ibm");
+        w.println("200\thttp://yahoo.com,yahoo");    	
+        w.println("300\thttp://sun.com,sun");
+        w.close();   
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
     }
     
     @After
@@ -103,7 +115,9 @@
         new File(INPUT_FILE2).delete();
         Util.deleteFile(cluster, INPUT_FILE2);
         new File(INPUT_FILE3).delete();
-        Util.deleteFile(cluster, INPUT_FILE3);
+        Util.deleteFile(cluster, INPUT_FILE3);        
+        new File(INPUT_FILE4).delete();
+        Util.deleteFile(cluster, INPUT_FILE4);
     }
     
     public void testAccumBasic() throws IOException{
@@ -486,17 +500,58 @@
         }    
     }
 
-	// Pig 1105
+    // Pig 1105
     public void testAccumCountStar() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
         pigServer.registerQuery("C = group A by id;");
         pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);");
 
-		try {
-			Iterator<Tuple> iter = pigServer.openIterator("D");
-		} catch (Exception e) {
-			fail("COUNT_STAR should be supported by accumulator interface");
-		}      
-	}
-	
+        try {
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+        } catch (Exception e) {
+            fail("COUNT_STAR should be supported by accumulator interface");
+        }      
+    }
+    
+    
+    public void testAccumulatorOff() throws IOException{
+        pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "false");
+        
+        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulativeSumBag(A);");
+        
+        try {
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            int c = 0;
+            while(iter.hasNext()) {
+                iter.next();
+                c++;
+            }
+            fail("Accumulator should be off.");
+        }catch(Exception e) {
+            // we should get exception
+        }
+        
+    }              
+    
+    public void testAccumWithMap() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);");
+        pigServer.registerQuery("B = group A by (id, url);");
+        pigServer.registerQuery("C = foreach B generate COUNT(A), org.apache.pig.test.utils.URLPARSE(group.url)#'url';");                     
+
+        HashMap<Integer, String> expected = new HashMap<Integer, String>();
+        expected.put(2, "http://ibm.com");
+        expected.put(1, "http://yahoo.com");
+        expected.put(1, "http://sun.com");        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Long)t.get(0)), (String)t.get(1));                
+        }                                   
+    }        
+    
+
 }