You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/02/20 03:26:04 UTC
svn commit: r912064 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: olga
Date: Sat Feb 20 02:26:04 2010
New Revision: 912064
URL: http://svn.apache.org/viewvc?rev=912064&view=rev
Log:
PIG-1241: Accumulator is turned on when a map is used with a non-accumulative
UDF (yinghe vi olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sat Feb 20 02:26:04 2010
@@ -130,6 +130,9 @@
BUG FIXES
+PIG-1241: Accumulator is turned on when a map is used with a non-accumulative
+UDF (yinghe vi olgan)
+
PIG-1215: Make Hadoop jobId more prominent in the client log (ashutoshc)
PIG-1216: New load store design does not allow Pig to validate inputs and
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Sat Feb 20 02:26:04 2010
@@ -155,7 +155,7 @@
}
if (po instanceof POMapLookUp) {
- return true;
+ return check(po.getInputs().get(0));
}
if (po instanceof POProject) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Sat Feb 20 02:26:04 2010
@@ -457,9 +457,12 @@
EndOfAllInputSetter checker = new EndOfAllInputSetter(plan);
checker.visit();
- AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
- accum.visit();
-
+ boolean isAccum =
+ "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.accumulator","true"));
+ if (isAccum) {
+ AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+ accum.visit();
+ }
return plan;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Sat Feb 20 02:26:04 2010
@@ -33,6 +33,7 @@
private static final String INPUT_FILE = "AccumulatorInput.txt";
private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
+ private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
private PigServer pigServer;
private MiniCluster cluster = MiniCluster.buildCluster();
@@ -50,6 +51,7 @@
@Before
public void setUp() throws Exception {
+ pigServer.getPigContext().getProperties().remove("opt.accumulator");
createFiles();
}
@@ -94,6 +96,16 @@
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+
+ w = new PrintWriter(new FileWriter(INPUT_FILE4));
+
+ w.println("100\thttp://ibm.com,ibm");
+ w.println("100\thttp://ibm.com,ibm");
+ w.println("200\thttp://yahoo.com,yahoo");
+ w.println("300\thttp://sun.com,sun");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
}
@After
@@ -103,7 +115,9 @@
new File(INPUT_FILE2).delete();
Util.deleteFile(cluster, INPUT_FILE2);
new File(INPUT_FILE3).delete();
- Util.deleteFile(cluster, INPUT_FILE3);
+ Util.deleteFile(cluster, INPUT_FILE3);
+ new File(INPUT_FILE4).delete();
+ Util.deleteFile(cluster, INPUT_FILE4);
}
public void testAccumBasic() throws IOException{
@@ -486,17 +500,58 @@
}
}
- // Pig 1105
+ // Pig 1105
public void testAccumCountStar() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
pigServer.registerQuery("C = group A by id;");
pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);");
- try {
- Iterator<Tuple> iter = pigServer.openIterator("D");
- } catch (Exception e) {
- fail("COUNT_STAR should be supported by accumulator interface");
- }
- }
-
+ try {
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ } catch (Exception e) {
+ fail("COUNT_STAR should be supported by accumulator interface");
+ }
+ }
+
+
+ public void testAccumulatorOff() throws IOException{
+ pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "false");
+
+ pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulativeSumBag(A);");
+
+ try {
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ int c = 0;
+ while(iter.hasNext()) {
+ iter.next();
+ c++;
+ }
+ fail("Accumulator should be off.");
+ }catch(Exception e) {
+ // we should get exception
+ }
+
+ }
+
+ public void testAccumWithMap() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);");
+ pigServer.registerQuery("B = group A by (id, url);");
+ pigServer.registerQuery("C = foreach B generate COUNT(A), org.apache.pig.test.utils.URLPARSE(group.url)#'url';");
+
+ HashMap<Integer, String> expected = new HashMap<Integer, String>();
+ expected.put(2, "http://ibm.com");
+ expected.put(1, "http://yahoo.com");
+ expected.put(1, "http://sun.com");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Long)t.get(0)), (String)t.get(1));
+ }
+ }
+
+
}