You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/01/30 04:58:07 UTC
svn commit: r739161 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/builtin/ test/org/apache/pig/test/
Author: sms
Date: Fri Jan 30 03:58:07 2009
New Revision: 739161
URL: http://svn.apache.org/viewvc?rev=739161&view=rev
Log:
PIG-646: Distinct UDF should report progress
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jan 30 03:58:07 2009
@@ -390,3 +390,5 @@
PIG-631: 4 Unit test failures on Windows (daijy)
PIG-645: Streaming is broken with the latest trunk (pradeepkth)
+
+ PIG-646: Distinct UDF should report progress (sms)
Modified: hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java Fri Jan 30 03:58:07 2009
@@ -116,7 +116,7 @@
}
// report that progress is being made (otherwise hadoop times out after 600 seconds working on one outer tuple)
- protected void progress() {
+ public final void progress() {
if (reporter != null) reporter.progress();
else log.warn("No reporter object provided to UDF " + this.getClass().getName());
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Jan 30 03:58:07 2009
@@ -127,6 +127,7 @@
Reporter reporter) throws IOException {
pigReporter.setRep(reporter);
+ PhysicalOperator.setReporter(pigReporter);
// In the case we optimize, we combine
// POPackage and POForeach - so we could get many
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Jan 30 03:58:07 2009
@@ -388,6 +388,7 @@
// which could additionally be called from close()
this.outputCollector = oc;
pigReporter.setRep(reporter);
+ PhysicalOperator.setReporter(pigReporter);
// If the keyType is not a tuple, the MapWithComparator.collect()
// would have wrapped the key into a tuple so that the
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java Fri Jan 30 03:58:07 2009
@@ -99,7 +99,7 @@
*/
@Override
public Tuple exec(Tuple input) throws IOException {
- return tupleFactory.newTuple(getDistinctFromNestedBags(input));
+ return tupleFactory.newTuple(getDistinctFromNestedBags(input, this));
}
}
@@ -110,12 +110,13 @@
*/
@Override
public DataBag exec(Tuple input) throws IOException {
- return getDistinctFromNestedBags(input);
+ return getDistinctFromNestedBags(input, this);
}
}
- static private DataBag getDistinctFromNestedBags(Tuple input) throws IOException {
+ static private DataBag getDistinctFromNestedBags(Tuple input, EvalFunc evalFunc) throws IOException {
DataBag result = bagFactory.newDistinctBag();
+ long progressCounter = 0;
try {
DataBag bg = (DataBag)input.get(0);
for (Tuple tuple : bg) {
@@ -124,6 +125,10 @@
// and distinct over all tuples
for (Tuple t : (DataBag)tuple.get(0)) {
result.add(t);
+ ++progressCounter;
+ if((progressCounter % 1000) == 0){
+ evalFunc.progress();
+ }
}
}
} catch (ExecException e) {
@@ -132,12 +137,17 @@
return result;
}
- static protected DataBag getDistinct(Tuple input) throws IOException {
+ protected DataBag getDistinct(Tuple input) throws IOException {
try {
DataBag inputBg = (DataBag)input.get(0);
DataBag result = bagFactory.newDistinctBag();
+ long progressCounter = 0;
for (Tuple tuple : inputBg) {
result.add(tuple);
+ ++progressCounter;
+ if ((progressCounter % 1000) == 0) {
+ progress();
+ }
}
return result;
} catch (ExecException e) {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Jan 30 03:58:07 2009
@@ -940,6 +940,33 @@
DataBag expectedBag = Util.createBagOfOneColumn(exp);
assertEquals(expectedBag, result);
+ }
+
+ @Test
+ public void testDistinctProgressNonAlgebraic() throws Exception {
+
+ //This test is for the exec method in Distinct which is not
+ //called currently.
+
+ int inputSize = 2002;
+ Integer[] inp = new Integer[inputSize];
+ for(int i = 0; i < inputSize; i+=2) {
+ inp[i] = i/2;
+ inp[i+1] = i/2;
+ }
+
+ DataBag inputBag = Util.createBagOfOneColumn(inp);
+ EvalFunc<DataBag> distinct = new Distinct();
+ DataBag result = distinct.exec(tupleFactory.newTuple(inputBag));
+
+ Integer[] exp = new Integer[inputSize/2];
+ for(int j = 0; j < inputSize/2; ++j) {
+ exp[j] = j;
+ }
+
+ DataBag expectedBag = Util.createBagOfOneColumn(exp);
+ assertEquals(expectedBag, result);
+
}
@Test
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Fri Jan 30 03:58:07 2009
@@ -42,6 +42,7 @@
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.Distinct;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.TextLoader;
import org.apache.pig.data.*;
@@ -327,13 +328,13 @@
- /*
+ /*
@Test
public void testSort() throws Exception{
testSortDistinct(false, false);
- }
- */
-
+ }
+ */
+
@Test
public void testSortWithUDF() throws Exception{
testSortDistinct(false, true);
@@ -1043,6 +1044,49 @@
Util.deleteFile(cluster, "table1");
Util.deleteFile(cluster, "table2");
}
+
+ @Test
+ public void testAlgebraicDistinctProgress() throws Exception {
+ //creating a test input of larger than 1000 to make
+ //sure that progress kicks in. The only way to test this
+ //is to add a log statement to the getDistinct
+ //method in Distinct.java. There is no automated mechanism
+ //to check this from pig
+ int inputSize = 4004;
+ Integer[] inp = new Integer[inputSize];
+ String[] inpString = new String[inputSize];
+ for(int i = 0; i < inputSize; i+=2) {
+ inp[i] = i/2;
+ inp[i+1] = i/2;
+ inpString[i] = new Integer(i/2).toString();
+ inpString[i+1] = new Integer(i/2).toString();
+ }
+
+
+ DataBag inputBag = Util.createBagOfOneColumn(inp);
+ Util.createInputFile(cluster, "table", inpString);
+
+ pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
+ pigServer.registerQuery("b = group a ALL;");
+ pigServer.registerQuery("c = foreach b {aa = DISTINCT a; generate COUNT(aa);};");
+ Iterator<Tuple> it = pigServer.openIterator("c");
+
+ Integer[] exp = new Integer[inputSize/2];
+ for(int j = 0; j < inputSize/2; ++j) {
+ exp[j] = j;
+ }
+
+ DataBag expectedBag = Util.createBagOfOneColumn(exp);
+
+ while(it.hasNext()) {
+ Tuple tup = it.next();
+ Long resultBagSize = (Long)tup.get(0);
+ assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0);
+ }
+
+ Util.deleteFile(cluster, "table");
+ }
+
}