You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/02/27 03:06:12 UTC

svn commit: r1662619 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig...

Author: daijy
Date: Fri Feb 27 02:06:11 2015
New Revision: 1662619

URL: http://svn.apache.org/r1662619
Log:
PIG-4408: Merge join should support replicated join as a predecessor

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
    pig/trunk/test/org/apache/pig/test/TestMergeJoin.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Feb 27 02:06:11 2015
@@ -50,6 +50,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4408: Merge join should support replicated join as a predecessor (bridiver via daijy)
+
 PIG-4389: Flag to run selected test suites in e2e tests (daijy)
 
 PIG-4385: testDefaultBootup fails because it cannot find "pig.properties" (mkudlej via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Feb 27 02:06:11 2015
@@ -123,7 +123,7 @@ public class POMergeJoin extends Physica
 
     private String signature;
 
-    private byte endOfRecordMark;
+    private byte endOfRecordMark = POStatus.STATUS_NULL;
 
     // This serves as the default TupleFactory
     private transient TupleFactory mTupleFactory;
@@ -159,15 +159,6 @@ public class POMergeJoin extends Physica
         this.joinType = joinType;
         this.leftInputSchema = leftInputSchema;
         this.mergedInputSchema = mergedInputSchema;
-        this.endOfRecordMark = POStatus.STATUS_EOP;
-    }
-
-    // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez.
-    // This is because:
-    // For MR, we send EOP at the end of every record
-    // For Tez, we only use a global EOP, so send NULL for end of record
-    public void setEndOfRecordMark(byte endOfRecordMark) {
-        this.endOfRecordMark = endOfRecordMark;
     }
 
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Feb 27 02:06:11 2015
@@ -1134,7 +1134,6 @@ public class TezCompiler extends PhyPlan
     public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
 
         try{
-            joinOp.setEndOfRecordMark(POStatus.STATUS_NULL);
             if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
                 int errCode=1101;
                 throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java Fri Feb 27 02:06:11 2015
@@ -39,7 +39,8 @@ public class MapSideMergeValidator {
                 if (!(lo instanceof LOFilter
                         || lo instanceof LOGenerate || lo instanceof LOInnerLoad
                         || lo instanceof LOLoad || lo instanceof LOSplitOutput
-                        || lo instanceof LOSplit
+                        || lo instanceof LOSplit 
+                        || (lo instanceof LOJoin && ((LOJoin)lo).getJoinType() == LOJoin.JOINTYPE.REPLICATED)
                         || isAcceptableSortOp(lo)
                         || isAcceptableForEachOp(lo))) {
                     throw new LogicalToPhysicalTranslatorException(errMsg, errCode);

Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=1662619&r1=1662618&r2=1662619&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Fri Feb 27 02:06:11 2015
@@ -52,6 +52,7 @@ public class TestMergeJoin {
 
     private static final String INPUT_FILE = "testMergeJoinInput.txt";
     private static final String INPUT_FILE2 = "testMergeJoinInput2.txt";
+    private static final String INPUT_FILE3 = "testMergeJoinInput3.txt";
     private PigServer pigServer;
     private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
@@ -78,6 +79,19 @@ public class TestMergeJoin {
         Util.createInputFile(cluster, INPUT_FILE, input);
         
         Util.createInputFile(cluster, INPUT_FILE2, new String[]{"2"});
+
+        String[] input3 = new String[LOOP_SIZE];
+        for (int i = 0; i<= LOOP_SIZE-1; i++) {
+            input3[i] = "(" + (i + 1) + ")\t + {";
+            for(int j=1;j<=LOOP_SIZE;j++) {
+                input3[i] = input3[i] + "(" + j + ")";
+                if (j!=LOOP_SIZE) {
+                    input3[i] = input3[i] + ",";
+                }
+            }
+            input3[i] = input3[i] + "}";
+        }
+        Util.createInputFile(cluster, INPUT_FILE3, input3);
     }
 
     @AfterClass
@@ -91,6 +105,7 @@ public class TestMergeJoin {
     public void tearDown() throws Exception {
         Util.deleteFile(cluster, INPUT_FILE);
         Util.deleteFile(cluster, INPUT_FILE2);
+        Util.deleteFile(cluster, INPUT_FILE3);
     }
 
     @Test
@@ -145,6 +160,61 @@ public class TestMergeJoin {
         Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
     }
 
+    @Test
+    public void testMergeJoinWithReplicatedJoin() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("D = join A by f1, B by f1 using 'replicated';");
+            pigServer.registerQuery("E = join D by A::f1, C by f1 using 'merge';");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("D = join A by f1, B by f1 using 'replicated';");
+            pigServer.registerQuery("E = join D by A::f1, C by f1;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbshj.size(), dbMergeJoin.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinWithForeachFlatten() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (t:(f1:int), b:{(f1:int)});");
+        pigServer.registerQuery("C = foreach B generate flatten(t) as f1:int, flatten(b);");
+        pigServer.registerQuery("D = join C by f1, A by f1 using 'merge';");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("D = join C by f1, A by f1 using 'merge';");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("D = join C by f1, A by f1;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbshj.size(), dbMergeJoin.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
     @Test
     public void testMergeJoinOnMultiFields() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");