You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/02/27 03:06:12 UTC
svn commit: r1662619 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/
src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig...
Author: daijy
Date: Fri Feb 27 02:06:11 2015
New Revision: 1662619
URL: http://svn.apache.org/r1662619
Log:
PIG-4408: Merge join should support replicated join as a predecessor
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Feb 27 02:06:11 2015
@@ -50,6 +50,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4408: Merge join should support replicated join as a predecessor (bridiver via daijy)
+
PIG-4389: Flag to run selected test suites in e2e tests (daijy)
PIG-4385: testDefaultBootup fails because it cannot find "pig.properties" (mkudlej via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Feb 27 02:06:11 2015
@@ -123,7 +123,7 @@ public class POMergeJoin extends Physica
private String signature;
- private byte endOfRecordMark;
+ private byte endOfRecordMark = POStatus.STATUS_NULL;
// This serves as the default TupleFactory
private transient TupleFactory mTupleFactory;
@@ -159,15 +159,6 @@ public class POMergeJoin extends Physica
this.joinType = joinType;
this.leftInputSchema = leftInputSchema;
this.mergedInputSchema = mergedInputSchema;
- this.endOfRecordMark = POStatus.STATUS_EOP;
- }
-
- // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez.
- // This is because:
- // For MR, we send EOP at the end of every record
- // For Tez, we only use a global EOP, so send NULL for end of record
- public void setEndOfRecordMark(byte endOfRecordMark) {
- this.endOfRecordMark = endOfRecordMark;
}
/**
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Feb 27 02:06:11 2015
@@ -1134,7 +1134,6 @@ public class TezCompiler extends PhyPlan
public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
try{
- joinOp.setEndOfRecordMark(POStatus.STATUS_NULL);
if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
int errCode=1101;
throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java Fri Feb 27 02:06:11 2015
@@ -39,7 +39,8 @@ public class MapSideMergeValidator {
if (!(lo instanceof LOFilter
|| lo instanceof LOGenerate || lo instanceof LOInnerLoad
|| lo instanceof LOLoad || lo instanceof LOSplitOutput
- || lo instanceof LOSplit
+ || lo instanceof LOSplit
+ || (lo instanceof LOJoin && ((LOJoin)lo).getJoinType() == LOJoin.JOINTYPE.REPLICATED)
|| isAcceptableSortOp(lo)
|| isAcceptableForEachOp(lo))) {
throw new LogicalToPhysicalTranslatorException(errMsg, errCode);
Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Fri Feb 27 02:06:11 2015
@@ -52,6 +52,7 @@ public class TestMergeJoin {
private static final String INPUT_FILE = "testMergeJoinInput.txt";
private static final String INPUT_FILE2 = "testMergeJoinInput2.txt";
+ private static final String INPUT_FILE3 = "testMergeJoinInput3.txt";
private PigServer pigServer;
private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@@ -78,6 +79,19 @@ public class TestMergeJoin {
Util.createInputFile(cluster, INPUT_FILE, input);
Util.createInputFile(cluster, INPUT_FILE2, new String[]{"2"});
+
+ String[] input3 = new String[LOOP_SIZE];
+ for (int i = 0; i<= LOOP_SIZE-1; i++) {
+ input3[i] = "(" + (i + 1) + ")\t + {";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ input3[i] = input3[i] + "(" + j + ")";
+ if (j!=LOOP_SIZE) {
+ input3[i] = input3[i] + ",";
+ }
+ }
+ input3[i] = input3[i] + "}";
+ }
+ Util.createInputFile(cluster, INPUT_FILE3, input3);
}
@AfterClass
@@ -91,6 +105,7 @@ public class TestMergeJoin {
public void tearDown() throws Exception {
Util.deleteFile(cluster, INPUT_FILE);
Util.deleteFile(cluster, INPUT_FILE2);
+ Util.deleteFile(cluster, INPUT_FILE3);
}
@Test
@@ -145,6 +160,61 @@ public class TestMergeJoin {
Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
}
+ @Test
+ public void testMergeJoinWithReplicatedJoin() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+ pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+ DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("D = join A by f1, B by f1 using 'replicated';");
+ pigServer.registerQuery("E = join D by A::f1, C by f1 using 'merge';");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbMergeJoin.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = join A by f1, B by f1 using 'replicated';");
+ pigServer.registerQuery("E = join D by A::f1, C by f1;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbshj.size(), dbMergeJoin.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+ }
+
+ @Test
+ public void testMergeJoinWithForeachFlatten() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (t:(f1:int), b:{(f1:int)});");
+ pigServer.registerQuery("C = foreach B generate flatten(t) as f1:int, flatten(b);");
+ pigServer.registerQuery("D = join C by f1, A by f1 using 'merge';");
+ DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("D = join C by f1, A by f1 using 'merge';");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbMergeJoin.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = join C by f1, A by f1;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbshj.size(), dbMergeJoin.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+ }
+
@Test
public void testMergeJoinOnMultiFields() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");