You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/07 02:18:15 UTC
svn commit: r1565509 [3/3] - in /pig/branches/tez: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/executione...
Modified: pig/branches/tez/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigRunner.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigRunner.java Fri Feb 7 01:18:14 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigRunner;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -59,9 +60,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class TestPigRunner {
-
- private static MiniCluster cluster;
-
+
+ private static MiniCluster cluster;
+
private static final String INPUT_FILE = "input";
private static final String OUTPUT_FILE = "output";
private static final String PIG_FILE = "test.pig";
@@ -83,13 +84,13 @@ public class TestPigRunner {
public static void tearDownAfterClass() throws Exception {
new File(INPUT_FILE).delete();
cluster.shutDown();
- }
+ }
@Before
public void setUp() {
deleteAll(new File(OUTPUT_FILE));
- }
-
+ }
+
@Test
public void testErrorLogFile() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -97,22 +98,22 @@ public class TestPigRunner {
w.println("B = foreach A generate StringSize(a0);");
w.println("store B into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { "-x", "local", PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
+
assertTrue(!stats.isSuccessful());
-
+
Properties props = stats.getPigProperties();
String logfile = props.getProperty("pig.logfile");
File f = new File(logfile);
- assertTrue(f.exists());
+ assertTrue(f.exists());
} finally {
- new File(PIG_FILE).delete();
+ new File(PIG_FILE).delete();
}
}
-
+
@Test
public void testErrorLogFile2() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -120,29 +121,29 @@ public class TestPigRunner {
w.println("B = foreach A generate StringSize(a0);");
w.println("store B into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { "-M", "-x", "local", PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
+
assertTrue(!stats.isSuccessful());
-
+
Properties props = stats.getPigProperties();
- // If test on nfs, the pig script complaining "output" exists
+ // If test on nfs, the pig script complaining "output" exists
// and does not actually launch the job. This could due to a mapreduce
- // bug which removing file before closing it.
+ // bug which removing file before closing it.
// If this happens, props is null because we only set pigContext before
// launching job.
if (props!=null) {
String logfile = props.getProperty("pig.logfile");
File f = new File(logfile);
- assertTrue(f.exists());
+ assertTrue(f.exists());
}
} finally {
new File(PIG_FILE).delete();
}
}
-
+
@Test
public void simpleTest() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -151,32 +152,32 @@ public class TestPigRunner {
w.println("C = foreach B generate group, COUNT(A);");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertEquals(1, stats.getNumberJobs());
String name = stats.getOutputNames().get(0);
assertEquals(OUTPUT_FILE, name);
assertEquals(12, stats.getBytesWritten());
- assertEquals(3, stats.getRecordWritten());
-
+ assertEquals(3, stats.getRecordWritten());
+
assertEquals("A,B,C",
((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
-
+
Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties());
- assertTrue(conf.getBoolean("stop.on.failure", false));
+ assertTrue(conf.getBoolean("stop.on.failure", false));
assertTrue(!conf.getBoolean("aggregate.warning", true));
- assertTrue(!conf.getBoolean("opt.multiquery", true));
+ assertTrue(!conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true));
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void scriptsInDfsTest() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -187,19 +188,19 @@ public class TestPigRunner {
w.close();
Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE);
Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
-
+
try {
String[] args = { inputInDfs.toString() };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertTrue(stats.getJobGraph().size() == 1);
String name = stats.getOutputNames().get(0);
assertEquals(OUTPUT_FILE, name);
assertEquals(12, stats.getBytesWritten());
- assertEquals(3, stats.getRecordWritten());
-
+ assertEquals(3, stats.getRecordWritten());
+
assertEquals("A,B,C",
((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
} finally {
@@ -208,7 +209,7 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void orderByTest() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -231,22 +232,22 @@ public class TestPigRunner {
assertEquals(OUTPUT_FILE, stats.getOutputNames().get(0));
assertEquals(2, stats.getRecordWritten());
assertEquals(12, stats.getBytesWritten());
-
+
assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
0)).getAlias());
assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
js).get(0)).getAlias());
- assertEquals("B", js.getAlias());
+ assertEquals("B", js.getAlias());
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void simpleMultiQueryTest() throws Exception {
final String OUTPUT_FILE_2 = "output2";
-
+
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w.println("B = filter A by a0 >= 4;");
@@ -254,7 +255,7 @@ public class TestPigRunner {
w.println("store B into '" + OUTPUT_FILE_2 + "';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
@@ -279,8 +280,8 @@ public class TestPigRunner {
assertEquals(3, stats.getNumberRecords(fname));
} else {
assertEquals(2, stats.getNumberRecords(fname));
- }
- }
+ }
+ }
assertEquals("A,B,C",
((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
} finally {
@@ -289,11 +290,11 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE_2);
}
}
-
+
@Test
public void simpleMultiQueryTest2() throws Exception {
final String OUTPUT_FILE_2 = "output2";
-
+
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w.println("B = filter A by a0 >= 4;");
@@ -303,7 +304,7 @@ public class TestPigRunner {
w.println("store B into '" + OUTPUT_FILE_2 + "';");
w.println("store E into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
@@ -324,14 +325,14 @@ public class TestPigRunner {
assertEquals((numOfRecords1 * numOfCharsPerRecord1) + (numOfRecords2 * numOfCharsPerRecord2),
stats.getBytesWritten());
assertTrue(stats.getOutputNames().size() == 2);
- for (String fname : stats.getOutputNames()) {
+ for (String fname : stats.getOutputNames()) {
assertTrue(fname.equals(OUTPUT_FILE) || fname.equals(OUTPUT_FILE_2));
if (fname.equals(OUTPUT_FILE)) {
assertEquals(2, stats.getNumberRecords(fname));
} else {
assertEquals(2, stats.getNumberRecords(fname));
- }
- }
+ }
+ }
assertEquals("A,B,C,D,E",
((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
} finally {
@@ -385,26 +386,26 @@ public class TestPigRunner {
// TODO: error message has changed. Need to catch the new message generated from the
// new parser.
// assertTrue(stats.getErrorCode() == 1000);
-// assertEquals("Error during parsing. Invalid alias: a in {a0: int,a1: int,a2: int}",
+// assertEquals("Error during parsing. Invalid alias: a in {a0: int,a1: int,a2: int}",
// stats.getErrorMessage());
}
-
+
@Test
public void simpleNegativeTest2() throws Exception {
String[] args = { "-c", "-e", "this is a test" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
}
@Test
public void simpleNegativeTest3() throws Exception {
String[] args = { "-c", "-y" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
- assertEquals("Found unknown option (-y) at position 2",
+ assertEquals("Found unknown option (-y) at position 2",
stats.getErrorMessage());
}
-
+
@Test
public void NagetiveTest() throws Exception {
final String OUTPUT_FILE_2 = "output2";
@@ -417,12 +418,12 @@ public class TestPigRunner {
w.println("D = load '" + OUTPUT_FILE_2 + "';");
w.println("E = stream D through `false`;");
w.println("store E into 'ee';");
- w.close();
-
+ w.close();
+
try {
String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, null);
- assertTrue(!stats.isSuccessful());
+ PigStats stats = PigRunner.run(args, null);
+ assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
assertTrue(stats.getJobGraph().size() == 2);
JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
@@ -443,7 +444,7 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE_2);
}
}
-
+
@Test
public void testIsTempFile() throws Exception {
PigContext context = new PigContext(ExecType.LOCAL, new Properties());
@@ -453,7 +454,7 @@ public class TestPigRunner {
assertTrue("not a temp file: " + file, PigStatsUtil.isTempFile(file));
}
}
-
+
@Test
public void testCounterName() throws Exception {
String s = "jdbc:hsqldb:file:/tmp/batchtest;hsqldb.default_table_type=cached;hsqldb.cache_rows=100";
@@ -466,7 +467,7 @@ public class TestPigRunner {
name = MRPigStatsUtil.getMultiInputsCounterName(s, 2);
assertEquals(MRPigStatsUtil.MULTI_INPUTS_RECORD_COUNTER + "_2_batchtest*.txt", name);
}
-
+
@Test
public void testLongCounterName() throws Exception {
// Pig now restricts the string size of its counter name to less than 64 characters.
@@ -479,20 +480,20 @@ public class TestPigRunner {
w.close();
String longfilename = "longlonglonglonglonglonglonglonglonglonglonglongfilefilefilename";
Util.copyFromLocalToCluster(cluster, "myinputfile", longfilename);
-
+
PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w1.println("B = load '" + longfilename + "' as (a0:int, a1:int, a2:int);");
w1.println("C = join A by a0, B by a0;");
w1.println("store C into '" + OUTPUT_FILE + "';");
w1.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertEquals(1, stats.getNumberJobs());
List<InputStats> inputs = stats.getInputStats();
assertEquals(2, inputs.size());
@@ -504,7 +505,7 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testDuplicateCounterName() throws Exception {
// Pig now restricts the string size of its counter name to less than 64 characters.
@@ -514,20 +515,20 @@ public class TestPigRunner {
w.close();
String samefilename = "tmp/input";
Util.copyFromLocalToCluster(cluster, "myinputfile", samefilename);
-
+
PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w1.println("B = load '" + samefilename + "' as (a0:int, a1:int, a2:int);");
w1.println("C = join A by a0, B by a0;");
w1.println("store C into '" + OUTPUT_FILE + "';");
w1.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertEquals(1, stats.getNumberJobs());
List<InputStats> inputs = stats.getInputStats();
assertEquals(2, inputs.size());
@@ -543,23 +544,23 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testDuplicateCounterName2() throws Exception {
-
+
PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w1.println("B = filter A by a0 > 3;");
w1.println("store A into 'output';");
w1.println("store B into 'tmp/output';");
w1.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertEquals(1, stats.getNumberJobs());
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(2, outputs.size());
@@ -576,7 +577,7 @@ public class TestPigRunner {
Util.deleteFile(cluster, "tmp/output");
}
}
-
+
@Test
public void testRegisterExternalJar() throws Exception {
String[] args = { "-Dpig.additional.jars=pig-withouthadoop.jar",
@@ -592,7 +593,7 @@ public class TestPigRunner {
assertTrue(ctx.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar")));
assertTrue("default", ctx.getProperties().getProperty("mapred.job.queue.name")!=null && ctx.getProperties().getProperty("mapred.job.queue.name").equals("default")||
ctx.getProperties().getProperty("mapreduce.job.queuename")!=null && ctx.getProperties().getProperty("mapreduce.job.queuename").equals("default"));
-
+
}
@Test
@@ -605,11 +606,11 @@ public class TestPigRunner {
w.println("A = load '" + INPUT_FILE + "' using org.apache.pig.test.PigTestLoader();");
w.println("store A into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
- assertTrue(stats.isSuccessful());
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ assertTrue(stats.isSuccessful());
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -621,29 +622,29 @@ public class TestPigRunner {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("fs -mv nonexist.file dummy.file");
w.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.IO_EXCEPTION);
} finally {
new File(PIG_FILE).delete();
}
}
-
+
@Test // PIG-2006
public void testEmptyFile() throws IOException {
File f1 = new File( PIG_FILE );
-
+
FileWriter fw1 = new FileWriter(f1);
fw1.close();
try {
String[] args = { "-x", "local", "-c", PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
+
Assert.assertTrue(stats.isSuccessful());
Assert.assertEquals( 0, stats.getReturnCode() );
} finally {
@@ -651,7 +652,7 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void returnCodeTest() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -661,21 +662,21 @@ public class TestPigRunner {
w.println("D = join C by $0, B by $0;");
w.println("store D into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
+
assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() != 0);
assertTrue(stats.getOutputStats().size() == 0);
-
+
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void returnCodeTest2() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -684,43 +685,43 @@ public class TestPigRunner {
w.println("C = join B by b0, A by a0 using 'repl';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
+
assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() != 0);
assertTrue(stats.getOutputStats().size() == 0);
-
+
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
-
+
+
@Test //PIG-1893
public void testEmptyFileCounter() throws Exception {
-
+
PrintWriter w = new PrintWriter(new FileWriter("myinputfile"));
w.close();
-
+
Util.copyFromLocalToCluster(cluster, "myinputfile", "1.txt");
-
+
PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w1.println("B = load '1.txt' as (a0:int, a1:int, a2:int);");
w1.println("C = join A by a0, B by a0;");
w1.println("store C into '" + OUTPUT_FILE + "';");
w1.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertEquals(1, stats.getNumberJobs());
List<InputStats> inputs = stats.getInputStats();
assertEquals(2, inputs.size());
@@ -736,7 +737,7 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test //PIG-1893
public void testEmptyFileCounter2() throws Exception {
@@ -746,13 +747,13 @@ public class TestPigRunner {
w1.println("store A into '" + OUTPUT_FILE + "';");
w1.println("store B into 'output2';");
w1.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertEquals(1, stats.getNumberJobs());
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(2, outputs.size());
@@ -769,22 +770,22 @@ public class TestPigRunner {
Util.deleteFile(cluster, "output2");
}
}
-
+
@Test // PIG-2208: Restrict number of PIG generated Haddop counters
- public void testDisablePigCounters() throws Exception {
+ public void testDisablePigCounters() throws Exception {
PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w1.println("B = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w1.println("C = join A by a0, B by a0;");
w1.println("store C into '" + OUTPUT_FILE + "';");
w1.close();
-
+
try {
String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertEquals(1, stats.getNumberJobs());
List<InputStats> inputs = stats.getInputStats();
assertEquals(2, inputs.size());
@@ -792,7 +793,7 @@ public class TestPigRunner {
// the multi-input counters are disabled
assertEquals(-1, instats.getNumberRecords());
}
-
+
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(1, outputs.size());
OutputStats outstats = outputs.get(0);
@@ -802,11 +803,11 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test //Pig-2358
public void testGetHadoopCounters() throws Exception {
final String OUTPUT_FILE_2 = "output2";
-
+
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w.println("B = filter A by a0 >= 4;");
@@ -816,11 +817,11 @@ public class TestPigRunner {
w.println("store B into '" + OUTPUT_FILE_2 + "';");
w.println("store E into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
@@ -832,11 +833,11 @@ public class TestPigRunner {
MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
-
+
// Skip for hadoop 20.203+, See PIG-2446
if (Util.isHadoop203plus())
return;
-
+
assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
MRPigStatsUtil.HDFS_BYTES_READ).getValue());
} finally {
@@ -845,23 +846,23 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE_2);
}
}
-
+
@Test // PIG-2208: Restrict number of PIG generated Haddop counters
public void testDisablePigCounters2() throws Exception {
-
+
PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
w1.println("B = filter A by a0 > 3;");
w1.println("store A into 'output';");
w1.println("store B into 'tmp/output';");
w1.close();
-
+
try {
String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(stats.isSuccessful());
-
+
assertEquals(1, stats.getNumberJobs());
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(2, outputs.size());
@@ -869,7 +870,7 @@ public class TestPigRunner {
// the multi-output counters are disabled
assertEquals(-1, outstats.getNumberRecords());
}
-
+
List<InputStats> inputs = stats.getInputStats();
assertEquals(1, inputs.size());
InputStats instats = inputs.get(0);
@@ -880,7 +881,7 @@ public class TestPigRunner {
Util.deleteFile(cluster, "tmp/output");
}
}
-
+
/**
* PIG-2780: In this test case, Pig submits three jobs at the same time and
* one of them will fail due to nonexistent input file. If users enable
@@ -889,7 +890,7 @@ public class TestPigRunner {
*/
@Test
public void testStopOnFailure() throws Exception {
-
+
PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
w1.println("A1 = load '" + INPUT_FILE + "';");
w1.println("B1 = load 'nonexist';");
@@ -900,13 +901,13 @@ public class TestPigRunner {
w1.println("ret = union A2,B2,C2;");
w1.println("store ret into 'tmp/output';");
w1.close();
-
+
try {
String[] args = { "-F", PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener());
-
+
assertTrue(!stats.isSuccessful());
-
+
int successfulJobs = 0;
Iterator<Operator> it = stats.getJobGraph().getOperators();
while (it.hasNext()){
@@ -914,10 +915,10 @@ public class TestPigRunner {
if (js.isSuccessful())
successfulJobs++;
}
-
+
// we should have less than 2 successful jobs
assertTrue("Should have less than 2 successful jobs", successfulJobs < 2);
-
+
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -925,9 +926,9 @@ public class TestPigRunner {
}
}
public static class TestNotificationListener implements PigProgressNotificationListener {
-
+
private Map<String, int[]> numMap = new HashMap<String, int[]>();
-
+
private static final int JobsToLaunch = 0;
private static final int JobsSubmitted = 1;
private static final int JobStarted = 2;
@@ -940,8 +941,8 @@ public class TestPigRunner {
}
@Override
- public void launchStartedNotification(String id, int numJobsToLaunch) {
- System.out.println("id: " + id + " numJobsToLaunch: " + numJobsToLaunch);
+ public void launchStartedNotification(String id, int numJobsToLaunch) {
+ System.out.println("id: " + id + " numJobsToLaunch: " + numJobsToLaunch);
int[] nums = new int[4];
numMap.put(id, nums);
nums[JobsToLaunch] = numJobsToLaunch;
@@ -949,19 +950,19 @@ public class TestPigRunner {
@Override
public void jobFailedNotification(String id, JobStats jobStats) {
- System.out.println("id: " + id + " job failed: " + jobStats.getJobId());
+ System.out.println("id: " + id + " job failed: " + jobStats.getJobId());
}
@Override
public void jobFinishedNotification(String id, JobStats jobStats) {
- System.out.println("id: " + id + " job finished: " + jobStats.getJobId());
+ System.out.println("id: " + id + " job finished: " + jobStats.getJobId());
int[] nums = numMap.get(id);
- nums[JobFinished]++;
+ nums[JobFinished]++;
}
@Override
public void jobStartedNotification(String id, String assignedJobId) {
- System.out.println("id: " + id + " job started: " + assignedJobId);
+ System.out.println("id: " + id + " job started: " + assignedJobId);
int[] nums = numMap.get(id);
nums[JobStarted]++;
}
@@ -975,7 +976,7 @@ public class TestPigRunner {
@Override
public void launchCompletedNotification(String id, int numJobsSucceeded) {
- System.out.println("id: " + id + " numJobsSucceeded: " + numJobsSucceeded);
+ System.out.println("id: " + id + " numJobsSucceeded: " + numJobsSucceeded);
System.out.println("");
int[] nums = numMap.get(id);
assertEquals(nums[JobsToLaunch], numJobsSucceeded);
@@ -991,9 +992,9 @@ public class TestPigRunner {
@Override
public void progressUpdatedNotification(String id, int progress) {
- System.out.println("id: " + id + " progress: " + progress + "%");
+ System.out.println("id: " + id + " progress: " + progress + "%");
}
-
+
}
private void deleteAll(File d) {
@@ -1002,7 +1003,7 @@ public class TestPigRunner {
for (File f : d.listFiles()) {
deleteAll(f);
}
- }
- d.delete();
+ }
+ d.delete();
}
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigServer.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigServer.java Fri Feb 7 01:18:14 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.executionengine.ExecException;
@@ -818,7 +819,7 @@ public class TestPigServer {
assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
assertEquals("true", properties.getProperty("aggregate.warning"));
- assertEquals("true", properties.getProperty("opt.multiquery"));
+ assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
assertEquals("false", properties.getProperty("stop.on.failure"));
//Test with properties file
@@ -828,7 +829,7 @@ public class TestPigServer {
assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
assertEquals("true", properties.getProperty("aggregate.warning"));
- assertEquals("true", properties.getProperty("opt.multiquery"));
+ assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
assertEquals("false", properties.getProperty("stop.on.failure"));
PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
@@ -840,7 +841,7 @@ public class TestPigServer {
properties = PropertiesUtil.loadDefaultProperties();
assertEquals("false", properties.getProperty("aggregate.warning"));
- assertEquals("false", properties.getProperty("opt.multiquery"));
+ assertEquals("false", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
assertEquals("true", properties.getProperty("stop.on.failure"));
propertyFile.delete();
Modified: pig/branches/tez/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestStore.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestStore.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestStore.java Fri Feb 7 01:18:14 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
@@ -789,7 +790,7 @@ public class TestStore {
}
private void checkStorePath(String orig, String expected, boolean isTmp) throws Exception {
- pc.getProperties().setProperty("opt.multiquery",""+true);
+ pc.getProperties().setProperty(PigConfiguration.OPT_MULTIQUERY,""+true);
DataStorage dfs = pc.getDfs();
dfs.setActiveContainer(dfs.asContainer("/tmp"));
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld Fri Feb 7 01:18:14 2014
@@ -2,16 +2,17 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-196
+# TEZ DAG plan: scope-210
#--------------------------------------------------
-Tez vertex scope-106 -> Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-151,Tez vertex scope-140,Tez vertex scope-174,Tez vertex scope-174,
+Tez vertex scope-106 -> Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-151,Tez vertex scope-140,Tez vertex scope-163,Tez vertex scope-188,
Tez vertex scope-122
-Tez vertex scope-174
Tez vertex scope-140 -> Tez vertex scope-151,
Tez vertex scope-151 -> Tez vertex scope-153,
Tez vertex scope-153
Tez vertex scope-126
Tez vertex scope-109
+Tez vertex scope-163 -> Tez vertex scope-188,
+Tez vertex scope-188
Tez vertex scope-106
# Plan on vertex
@@ -51,13 +52,9 @@ Tez vertex scope-106
| | | | |
| | | | |---Constant(3) - scope-86
| | | |
-| | | f1: Split - scope-96
+| | | f1: Local Rearrange[tuple]{tuple}(false) - scope-162 -> scope-163
| | | | |
-| | | | f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
-| | | | |
-| | | | f2: Local Rearrange[tuple]{tuple}(false) - scope-173 -> scope-174
-| | | | | |
-| | | | | Project[tuple][*] - scope-172
+| | | | Project[tuple][*] - scope-161
| | | |
| | | |---f1: Limit - scope-95
| | | |
@@ -83,21 +80,21 @@ Tez vertex scope-106
| | | |
| | | Project[int][0] - scope-48
| | |
-| | c2: Local Rearrange[tuple]{int}(false) - scope-188 -> scope-126
+| | c2: Local Rearrange[tuple]{int}(false) - scope-202 -> scope-126
| | | |
-| | | Project[int][0] - scope-190
+| | | Project[int][0] - scope-204
| | |
-| | |---c3: New For Each(false,false)[bag] - scope-176
+| | |---c3: New For Each(false,false)[bag] - scope-190
| | | |
-| | | Project[int][0] - scope-177
+| | | Project[int][0] - scope-191
| | | |
-| | | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-178
+| | | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-192
| | | |
-| | | |---Project[bag][0] - scope-179
+| | | |---Project[bag][0] - scope-193
| | | |
-| | | |---Project[bag][1] - scope-180
+| | | |---Project[bag][1] - scope-194
| | |
-| | |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-191
+| | |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-205
| |
| |---c: Filter[bag] - scope-34
| | |
@@ -111,9 +108,9 @@ Tez vertex scope-106
| | |
| | d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-80
| | |
-| | f2: Local Rearrange[tuple]{tuple}(false) - scope-171 -> scope-174
+| | f2: Local Rearrange[tuple]{tuple}(false) - scope-185 -> scope-188
| | | |
-| | | Project[tuple][*] - scope-170
+| | | Project[tuple][*] - scope-184
| |
| |---d1: Filter[bag] - scope-73
| | |
@@ -153,11 +150,6 @@ c1: Store(file:///tmp/output/c1:org.apac
| Project[bag][2] - scope-52
|
|---c1: Package(Packager)[tuple]{int} - scope-46
-Tez vertex scope-174
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
-|
-|---f2: Package(Packager)[tuple]{tuple} - scope-175
Tez vertex scope-140
# Plan on vertex
Local Rearrange[tuple]{bytearray}(false) - scope-150 -> scope-151
@@ -193,19 +185,19 @@ e1: Store(file:///tmp/output/e1:org.apac
|---Package(LitePackager)[tuple]{int} - scope-154
Tez vertex scope-126
# Combine plan on edge <scope-106>
-c2: Local Rearrange[tuple]{int}(false) - scope-192 -> scope-126
+c2: Local Rearrange[tuple]{int}(false) - scope-206 -> scope-126
| |
-| Project[int][0] - scope-194
+| Project[int][0] - scope-208
|
-|---c3: New For Each(false,false)[bag] - scope-181
+|---c3: New For Each(false,false)[bag] - scope-195
| |
- | Project[int][0] - scope-182
+ | Project[int][0] - scope-196
| |
- | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-183
+ | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-197
| |
- | |---Project[bag][1] - scope-184
+ | |---Project[bag][1] - scope-198
|
- |---c2: Package(CombinerPackager)[tuple]{int} - scope-187
+ |---c2: Package(CombinerPackager)[tuple]{int} - scope-201
# Plan on vertex
c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-68
|
@@ -215,7 +207,7 @@ c3: Store(file:///tmp/output/c1:org.apac
| |
| POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-65
| |
- | |---Project[bag][1] - scope-185
+ | |---Project[bag][1] - scope-199
|
|---c2: Package(CombinerPackager)[tuple]{int} - scope-58
Tez vertex scope-109
@@ -236,4 +228,26 @@ b1: Split - scope-20
| | |
| | |---Project[bag][1] - scope-28
|
-|---b1: Package(Packager)[tuple]{int} - scope-17
\ No newline at end of file
+|---b1: Package(Packager)[tuple]{int} - scope-17
+Tez vertex scope-163
+# Plan on vertex
+f1: Split - scope-96
+| |
+| f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
+| |
+| f2: Local Rearrange[tuple]{tuple}(false) - scope-187 -> scope-188
+| | |
+| | Project[tuple][*] - scope-186
+|
+|---f1: Limit - scope-167
+ |
+ |---f1: New For Each(true)[bag] - scope-166
+ | |
+ | Project[tuple][1] - scope-165
+ |
+ |---f1: Package(Packager)[tuple]{tuple} - scope-164
+Tez vertex scope-188
+# Plan on vertex
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
+|
+|---f2: Package(Packager)[tuple]{tuple} - scope-189
\ No newline at end of file