You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/01/28 19:27:08 UTC
svn commit: r904202 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
test/org/apache/pig/test/TestStreamingLocal.java
Author: rding
Date: Thu Jan 28 18:27:07 2010
New Revision: 904202
URL: http://svn.apache.org/viewvc?rev=904202&view=rev
Log:
PIG-1204: Pig hangs when joining two streaming relations in local mode
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=904202&r1=904201&r2=904202&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jan 28 18:27:07 2010
@@ -78,6 +78,9 @@
BUG FIXES
+PIG-1204: Pig hangs when joining two streaming relations in local mode
+(rding)
+
PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER,
FORACH (pradeepkth via gates)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=904202&r1=904201&r2=904202&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Thu Jan 28 18:27:07 2010
@@ -40,6 +40,8 @@
public class POStream extends PhysicalOperator {
private static final long serialVersionUID = 2L;
+
+ private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, null);
private String executableManagerStr; // String representing ExecutableManager to use
transient private ExecutableManager executableManager; // ExecutableManager to use
@@ -155,7 +157,7 @@
// getNext() in POStream should never be called. So
// we don't need to set any flag noting we saw all output
// from binary
- r.returnStatus = POStatus.STATUS_EOP;
+ r = EOP_RESULT;
}
return(r);
}
@@ -190,7 +192,7 @@
// getNext() in POStream should never be called. So
// we don't need to set any flag noting we saw all output
// from binary
- r.returnStatus = POStatus.STATUS_EOP;
+ r = EOP_RESULT;
}
}
@@ -204,7 +206,7 @@
// So once we send this EOP down, getNext() in POStream
// should never be called. So we don't need to set any
// flag noting we saw all output from binary
- r.returnStatus = POStatus.STATUS_EOP;
+ r = EOP_RESULT;
}
return r;
} else {
@@ -218,7 +220,7 @@
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
- r.returnStatus = POStatus.STATUS_EOP;
+ r = EOP_RESULT;
allOutputFromBinaryProcessed = true;
}
return r;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java?rev=904202&r1=904201&r2=904202&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java Thu Jan 28 18:27:07 2010
@@ -18,6 +18,8 @@
package org.apache.pig.test;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
import junit.framework.TestCase;
@@ -285,6 +287,39 @@
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
}
+
+ @Test
+ public void testJoinTwoStreamingRelations()
+ throws Exception {
+ ArrayList<String> list = new ArrayList<String>();
+ for (int i=0; i<10000; i++) {
+ list.add("A," + i);
+ }
+ File input = Util.createInputFile("tmp", "", list.toArray(new String[0]));
+
+ // Expected results
+ Tuple expected = DefaultTupleFactory.getInstance().newTuple(4);
+ expected.set(0, "A");
+ expected.set(1, 0);
+ expected.set(2, "A");
+ expected.set(3, 0);
+
+ pigServer.registerQuery("A = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',') as (a0, a1);");
+ pigServer.registerQuery("B = stream A through `head -1` as (a0, a1);");
+ pigServer.registerQuery("C = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',') as (a0, a1);");
+ pigServer.registerQuery("D = stream C through `head -1` as (a0, a1);");
+ pigServer.registerQuery("E = join B by a0, D by a0;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ int count = 0;
+ while (iter.hasNext()) {
+ Assert.assertEquals(expected.toString(), iter.next().toString());
+ count++;
+ }
+ Assert.assertTrue(count == 1);
+ }
@Test
public void testLocalNegativeLoadStoreOptimization() throws Exception {