You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/05/23 21:19:26 UTC
svn commit: r1745277 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
test/org/apache/pig/test/
Author: rohini
Date: Mon May 23 21:19:26 2016
New Revision: 1745277
URL: http://svn.apache.org/viewvc?rev=1745277&view=rev
Log:
PIG-4905: Input of empty dir does not produce empty output file in Tez (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1745277&r1=1745276&r2=1745277&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 23 21:19:26 2016
@@ -139,6 +139,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4905: Input of empty dir does not produce empty output file in Tez (rohini)
+
PIG-4576: Nightly test HCat_DDL_2 fails with TDE ON (nmaheshwari via daijy)
PIG-4873: InputSplit.getLocations return null and result a NPE in Pig (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1745277&r1=1745276&r2=1745277&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Mon May 23 21:19:26 2016
@@ -115,6 +115,11 @@ public class ParallelismSetter extends T
} else if (pc.defaultParallel != -1) {
parallelism = pc.defaultParallel;
}
+ if (parallelism == 0) {
+ // We need to produce empty output file.
+ // Even if user set PARALLEL 0, mapreduce has 1 reducer
+ parallelism = 1;
+ }
boolean overrideRequestedParallelism = false;
if (parallelism != -1
&& autoParallelismEnabled
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1745277&r1=1745276&r2=1745277&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Mon May 23 21:19:26 2016
@@ -165,6 +165,10 @@ public class TezOperDependencyParallelis
roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
}
+ if (roundedEstimatedParallelism == 0) {
+ roundedEstimatedParallelism = 1; // We need to produce empty output file
+ }
+
return roundedEstimatedParallelism;
}
Modified: pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1745277&r1=1745276&r2=1745277&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java Mon May 23 21:19:26 2016
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.FileWriter;
+import java.io.IOException;
import java.io.PrintWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.PigRunner;
-import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
@@ -38,16 +38,15 @@ import org.junit.Test;
public class TestEmptyInputDir {
- private static MiniCluster cluster;
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private static final String EMPTY_DIR = "emptydir";
private static final String INPUT_FILE = "input";
private static final String OUTPUT_FILE = "output";
private static final String PIG_FILE = "test.pig";
-
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- cluster = MiniCluster.buildCluster();
FileSystem fs = cluster.getFileSystem();
if (!fs.mkdirs(new Path(EMPTY_DIR))) {
throw new Exception("failed to create empty dir");
@@ -64,7 +63,35 @@ public class TestEmptyInputDir {
public static void tearDownAfterClass() throws Exception {
cluster.shutDown();
}
-
+
+ @Test
+ public void testGroupBy() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + EMPTY_DIR + "';");
+ w.println("B = group A by $0;");
+ w.println("store B into '" + OUTPUT_FILE + "';");
+ w.close();
+
+ try {
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(stats.isSuccessful());
+
+ // This assert fails on 205 due to MAPREDUCE-3606
+ if (Util.isMapredExecType(cluster.getExecType())
+ && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+ MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+ assertEquals(0, js.getNumberMaps());
+ }
+
+ assertEmptyOutputFile();
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ }
+ }
+
@Test
public void testSkewedJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -73,31 +100,28 @@ public class TestEmptyInputDir {
w.println("C = join B by $0, A by $0 using 'skewed';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
+
assertTrue(stats.isSuccessful());
- // the sampler job has zero maps
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (!Util.isHadoop205()&&!Util.isHadoop1_x())
- assertEquals(0, js.getNumberMaps());
-
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
- // output directory isn't empty
- assertTrue(fs.listStatus(status.getPath()).length > 0);
+ if (Util.isMapredExecType(cluster.getExecType())
+ && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+ // the sampler job has zero maps
+ MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+ assertEquals(0, js.getNumberMaps());
+ }
+
+ assertEmptyOutputFile();
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testMergeJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -106,32 +130,28 @@ public class TestEmptyInputDir {
w.println("C = join A by $0, B by $0 using 'merge';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- // the indexer job has zero maps
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-
+
+ assertTrue(stats.isSuccessful());
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (!Util.isHadoop205()&&!Util.isHadoop1_x())
- assertEquals(0, js.getNumberMaps());
-
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
-
- // output directory isn't empty
- assertTrue(fs.listStatus(status.getPath()).length > 0);
+ if (Util.isMapredExecType(cluster.getExecType())
+ && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+ // the indexer job has zero maps
+ MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+ assertEquals(0, js.getNumberMaps());
+ }
+
+ assertEmptyOutputFile();
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testFRJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -140,55 +160,44 @@ public class TestEmptyInputDir {
w.println("C = join A by $0, B by $0 using 'repl';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- // the indexer job has zero maps
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-
+
+ assertTrue(stats.isSuccessful());
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (!Util.isHadoop205()&&!Util.isHadoop1_x())
- assertEquals(0, js.getNumberMaps());
-
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
-
- // output directory isn't empty
- assertTrue(fs.listStatus(status.getPath()).length > 0);
+ if (Util.isMapredExecType(cluster.getExecType())
+ && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+ MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+ assertEquals(0, js.getNumberMaps());
+ }
+
+ assertEmptyOutputFile();
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testRegularJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("A = load '" + INPUT_FILE + "';");
w.println("B = load '" + EMPTY_DIR + "';");
- w.println("C = join B by $0, A by $0;");
+ w.println("C = join B by $0, A by $0 PARALLEL 0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
-
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
-
- // output directory isn't empty
- assertTrue(fs.listStatus(status.getPath()).length > 0);
-
+
+ assertTrue(stats.isSuccessful());
+
+ assertEmptyOutputFile();
+
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -203,19 +212,19 @@ public class TestEmptyInputDir {
w.println("C = join B by $0 right outer, A by $0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testLeftOuterJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -224,16 +233,28 @@ public class TestEmptyInputDir {
w.println("C = join B by $0 left outer, A by $0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
+
+ private void assertEmptyOutputFile() throws IllegalArgumentException, IOException {
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+ assertTrue(status.isDir());
+ assertEquals(0, status.getLen());
+ // output directory isn't empty. Has one empty file
+ FileStatus[] files = fs.listStatus(status.getPath(), Util.getSuccessMarkerPathFilter());
+ assertEquals(1, files.length);
+ assertEquals(0, files[0].getLen());
+ assertTrue(files[0].getPath().getName().startsWith("part-"));
+ }
}