You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/05/23 22:37:09 UTC

svn commit: r1745286 - in /pig/branches/branch-0.16: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ test/org/apache/pig/test/

Author: rohini
Date: Mon May 23 22:37:09 2016
New Revision: 1745286

URL: http://svn.apache.org/viewvc?rev=1745286&view=rev
Log:
PIG-4905: Input of empty dir does not produce empty output file in Tez (rohini)

Modified:
    pig/branches/branch-0.16/CHANGES.txt
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/branches/branch-0.16/test/org/apache/pig/test/TestEmptyInputDir.java

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1745286&r1=1745285&r2=1745286&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Mon May 23 22:37:09 2016
@@ -129,6 +129,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4905: Input of empty dir does not produce empty output file in Tez (rohini)
+
 PIG-4576: Nightly test HCat_DDL_2 fails with TDE ON (nmaheshwari via daijy)
 
 PIG-4873: InputSplit.getLocations return null and result a NPE in Pig (daijy)

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1745286&r1=1745285&r2=1745286&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Mon May 23 22:37:09 2016
@@ -115,6 +115,11 @@ public class ParallelismSetter extends T
                     } else if (pc.defaultParallel != -1) {
                         parallelism = pc.defaultParallel;
                     }
+                    if (parallelism == 0) {
+                        // We need to produce empty output file.
+                        // Even if user set PARALLEL 0, mapreduce has 1 reducer
+                        parallelism = 1;
+                    }
                     boolean overrideRequestedParallelism = false;
                     if (parallelism != -1
                             && autoParallelismEnabled

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1745286&r1=1745285&r2=1745286&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Mon May 23 22:37:09 2016
@@ -165,6 +165,10 @@ public class TezOperDependencyParallelis
             roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
         }
 
+        if (roundedEstimatedParallelism == 0) {
+            roundedEstimatedParallelism = 1; // We need to produce empty output file
+        }
+
         return roundedEstimatedParallelism;
     }
 

Modified: pig/branches/branch-0.16/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1745286&r1=1745285&r2=1745286&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/test/TestEmptyInputDir.java Mon May 23 22:37:09 2016
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.io.PrintWriter;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.PigRunner;
-import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
@@ -38,16 +38,15 @@ import org.junit.Test;
 
 public class TestEmptyInputDir {
 
-    private static MiniCluster cluster; 
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
     private static final String EMPTY_DIR = "emptydir";
     private static final String INPUT_FILE = "input";
     private static final String OUTPUT_FILE = "output";
     private static final String PIG_FILE = "test.pig";
 
-    
+
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
-        cluster = MiniCluster.buildCluster();
         FileSystem fs = cluster.getFileSystem();
         if (!fs.mkdirs(new Path(EMPTY_DIR))) {
             throw new Exception("failed to create empty dir");
@@ -64,7 +63,35 @@ public class TestEmptyInputDir {
     public static void tearDownAfterClass() throws Exception {
         cluster.shutDown();
     }
-    
+
+    @Test
+    public void testGroupBy() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + EMPTY_DIR + "';");
+        w.println("B = group A by $0;");
+        w.println("store B into '" + OUTPUT_FILE + "';");
+        w.close();
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+
+            // This assert fails on 205 due to MAPREDUCE-3606
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+
     @Test
     public void testSkewedJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -73,31 +100,28 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0, A by $0 using 'skewed';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
+
             assertTrue(stats.isSuccessful());
-            // the sampler job has zero maps
-            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                // the sampler job has zero maps
+                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testMergeJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -106,32 +130,28 @@ public class TestEmptyInputDir {
         w.println("C = join A by $0, B by $0 using 'merge';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());    
-            // the indexer job has zero maps
-            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
+            assertTrue(stats.isSuccessful());
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                // the indexer job has zero maps
+                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testFRJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -140,55 +160,44 @@ public class TestEmptyInputDir {
         w.println("C = join A by $0, B by $0 using 'repl';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());    
-            // the indexer job has zero maps
-            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
+            assertTrue(stats.isSuccessful());
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testRegularJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("A = load '" + INPUT_FILE + "';");
         w.println("B = load '" + EMPTY_DIR + "';");
-        w.println("C = join B by $0, A by $0;");
+        w.println("C = join B by $0, A by $0 PARALLEL 0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());   
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
-            
+
+            assertTrue(stats.isSuccessful());
+
+            assertEmptyOutputFile();
+
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -203,19 +212,19 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0 right outer, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());               
-            assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));                  
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testLeftOuterJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -224,16 +233,28 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0 left outer, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());               
-            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));                  
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
+
+    private void assertEmptyOutputFile() throws IllegalArgumentException, IOException {
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+        assertTrue(status.isDir());
+        assertEquals(0, status.getLen());
+        // output directory isn't empty. Has one empty file
+        FileStatus[] files = fs.listStatus(status.getPath(), Util.getSuccessMarkerPathFilter());
+        assertEquals(1, files.length);
+        assertEquals(0, files[0].getLen());
+        assertTrue(files[0].getPath().getName().startsWith("part-"));
+    }
 }