You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC
svn commit: r1598702 [16/23] - in /pig/trunk: ./ ivy/
shims/src/hadoop23/org/apache/pig/backend/hadoop23/
shims/test/hadoop20/org/apache/pig/test/
shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/
src/org/apache/pig/ src/org/apache/pig/ba...
Modified: pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAccumulator.java Fri May 30 19:07:23 2014
@@ -31,49 +31,65 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
-import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.parser.ParserException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestAccumulator {
- private static final String INPUT_FILE = "AccumulatorInput.txt";
+ private static final String INPUT_FILE1 = "AccumulatorInput1.txt";
private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
+ private static final String INPUT_DIR = "build/test/data";
- private PigServer pigServer;
- private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static PigServer pigServer;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ properties.setProperty("pig.accumulative.batchsize", "2");
+ properties.setProperty("pig.exec.nocombiner", "true");
+ // Reducing the number of retry attempts to speed up test completion
+ properties.setProperty("mapred.map.max.attempts","1");
+ properties.setProperty("mapred.reduce.max.attempts","1");
+ createFiles();
+ }
- public TestAccumulator() throws ExecException, IOException{
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- // pigServer = new PigServer(ExecType.LOCAL);
- pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2");
- pigServer.getPigContext().getProperties().setProperty("pig.exec.nocombiner", "true");
- // reducing the number of retry attempts to speed up test completion
- pigServer.getPigContext().getProperties().setProperty("mapred.map.max.attempts","1");
- pigServer.getPigContext().getProperties().setProperty("mapred.reduce.max.attempts","1");
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ deleteFiles();
+ cluster.shutDown();
}
@Before
public void setUp() throws Exception {
- pigServer.getPigContext().getProperties().remove("opt.accumulator");
- createFiles();
+ Util.resetStateForExecModeSwitch();
+ // Drop stale configuration from previous test run
+ properties.remove(PigConfiguration.OPT_ACCUMULATOR);
+ pigServer = new PigServer(cluster.getExecType(), properties);
}
- @AfterClass
- public static void oneTimeTearDown() throws Exception {
- cluster.shutDown();
+ @After
+ public void tearDown() throws Exception {
+ pigServer.shutdown();
+ pigServer = null;
}
- private void createFiles() throws IOException {
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ private static void createFiles() throws IOException {
+ new File(INPUT_DIR).mkdir();
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1));
w.println("100\tapple");
w.println("200\torange");
@@ -84,9 +100,9 @@ public class TestAccumulator {
w.println("400\tapple");
w.close();
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+ Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
- w = new PrintWriter(new FileWriter(INPUT_FILE2));
+ w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
w.println("100\t");
w.println("100\t");
@@ -95,9 +111,9 @@ public class TestAccumulator {
w.println("300\tstrawberry");
w.close();
- Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+ Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
- w = new PrintWriter(new FileWriter(INPUT_FILE3));
+ w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE3));
w.println("100\t1.0");
w.println("100\t2.0");
@@ -112,9 +128,9 @@ public class TestAccumulator {
w.println("400\t");
w.close();
- Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+ Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
- w = new PrintWriter(new FileWriter(INPUT_FILE4));
+ w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE4));
w.println("100\thttp://ibm.com,ibm");
w.println("100\thttp://ibm.com,ibm");
@@ -122,25 +138,17 @@ public class TestAccumulator {
w.println("300\thttp://sun.com,sun");
w.close();
- Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
+ Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE4, INPUT_FILE4);
}
- @After
- public void tearDown() throws Exception {
- new File(INPUT_FILE).delete();
- Util.deleteFile(cluster, INPUT_FILE);
- new File(INPUT_FILE2).delete();
- Util.deleteFile(cluster, INPUT_FILE2);
- new File(INPUT_FILE3).delete();
- Util.deleteFile(cluster, INPUT_FILE3);
- new File(INPUT_FILE4).delete();
- Util.deleteFile(cluster, INPUT_FILE4);
+ private static void deleteFiles() {
+ Util.deleteDirectory(new File(INPUT_DIR));
}
@Test
public void testAccumBasic() throws IOException{
// test group by
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A);");
@@ -150,7 +158,6 @@ public class TestAccumulator {
expected.put(300, 3);
expected.put(400, 1);
-
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
@@ -175,8 +182,8 @@ public class TestAccumulator {
}
// test cogroup
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
- pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("C = cogroup A by id, B by id;");
pigServer.registerQuery("D = foreach C generate group, " +
"org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);");
@@ -187,7 +194,6 @@ public class TestAccumulator {
expected2.put(300, "3,3");
expected2.put(400, "1,1");
-
iter = pigServer.openIterator("D");
while(iter.hasNext()) {
@@ -198,7 +204,7 @@ public class TestAccumulator {
@Test
public void testAccumWithNegative() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, -org.apache.pig.test.utils.AccumulatorBagCount(A);");
@@ -219,7 +225,7 @@ public class TestAccumulator {
@Test
public void testAccumWithAdd() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");
@@ -261,7 +267,7 @@ public class TestAccumulator {
@Test
public void testAccumWithMinus() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
" org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");
@@ -283,7 +289,7 @@ public class TestAccumulator {
@Test
public void testAccumWithMod() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");
@@ -305,7 +311,7 @@ public class TestAccumulator {
@Test
public void testAccumWithDivide() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");
@@ -327,7 +333,7 @@ public class TestAccumulator {
@Test
public void testAccumWithAnd() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
@@ -350,7 +356,7 @@ public class TestAccumulator {
@Test
public void testAccumWithOr() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
@@ -373,7 +379,7 @@ public class TestAccumulator {
@Test
public void testAccumWithRegexp() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");
@@ -417,7 +423,7 @@ public class TestAccumulator {
@Test
public void testAccumWithIsEmpty() throws IOException{
pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "1");
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
pigServer.registerQuery("C = cogroup A by id outer, B by id outer;");
pigServer.registerQuery("D = foreach C generate group," +
@@ -442,9 +448,10 @@ public class TestAccumulator {
@Test
public void testAccumWithDistinct() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
pigServer.registerQuery("B = group A by id;");
- pigServer.registerQuery("C = foreach B { D = distinct A; generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");
+ pigServer.registerQuery("C = foreach B { D = distinct A;" +
+ "generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 2);
@@ -454,18 +461,22 @@ public class TestAccumulator {
Iterator<Tuple> iter = pigServer.openIterator("C");
+ int count = 0;
while(iter.hasNext()) {
+ count++;
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
+ assertEquals(4, count);
}
@Test
public void testAccumWithSort() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
pigServer.registerQuery("B = foreach A generate id, f, id as t;");
pigServer.registerQuery("C = group B by id;");
- pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f; generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");
+ pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f;" +
+ "generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");
HashMap<Integer, String> expected = new HashMap<Integer, String>();
expected.put(100, "(apple)(apple)");
@@ -475,13 +486,17 @@ public class TestAccumulator {
Iterator<Tuple> iter = pigServer.openIterator("D");
+ int count = 0;
while(iter.hasNext()) {
+ count++;
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
}
+ assertEquals(4, count);
}
- public void testAccumWithBuildinAvg() throws IOException {
+ @Test
+ public void testAccumWithBuiltinAvg() throws IOException {
HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
expected.put(100, 3.0);
expected.put(200, 2.1);
@@ -490,31 +505,31 @@ public class TestAccumulator {
// Test all the averages for correct behaviour with null values
String[] types = { "double", "float", "int", "long" };
for (int i = 0; i < types.length; i++) {
- if (i > 1) { // adjust decimal error for non real types
- expected.put(200, 2.0);
- expected.put(300, 3.0);
- }
- pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:"
- + types[i] + ");");
- pigServer.registerQuery("C = group A by id;");
- pigServer.registerQuery("D = foreach C generate group, AVG(A.v);");
- Iterator<Tuple> iter = pigServer.openIterator("D");
-
- while (iter.hasNext()) {
- Tuple t = iter.next();
- Double v = expected.get((Integer) t.get(0));
- if (v != null) {
- assertEquals(v.doubleValue(), ((Number) t.get(1)).doubleValue(),
- 0.0001);
- } else {
- assertNull(t.get(1));
+ if (i > 1) { // adjust decimal error for non real types
+ expected.put(200, 2.0);
+ expected.put(300, 3.0);
+ }
+ pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:"
+ + types[i] + ");");
+ pigServer.registerQuery("C = group A by id;");
+ pigServer.registerQuery("D = foreach C generate group, AVG(A.v);");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while (iter.hasNext()) {
+ Tuple t = iter.next();
+ Double v = expected.get((Integer) t.get(0));
+ if (v != null) {
+ assertEquals(v.doubleValue(), ((Number) t.get(1)).doubleValue(),
+ 0.0001);
+ } else {
+ assertNull(t.get(1));
+ }
}
- }
}
}
@Test
- public void testAccumWithBuildin() throws IOException{
+ public void testAccumWithBuiltin() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
pigServer.registerQuery("C = group A by id;");
// moving AVG accumulator test to separate test case
@@ -532,19 +547,19 @@ public class TestAccumulator {
Tuple t = iter.next();
Double[] v = expected.get((Integer)t.get(0));
for(int i=0; i<v.length; i++) {
- if (v[i] != null) {
- assertEquals(v[i].doubleValue(), ((Number) t.get(i + 1))
- .doubleValue(), 0.0001);
- } else {
- assertNull(t.get(i + 1));
- }
+ if (v[i] != null) {
+ assertEquals(v[i].doubleValue(), ((Number) t.get(i + 1))
+ .doubleValue(), 0.0001);
+ } else {
+ assertNull(t.get(i + 1));
+ }
}
}
}
@Test
- public void testAccumWithMultiBuildin() throws IOException{
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, c:chararray);");
+ public void testAccumWithMultiBuiltin() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, c:chararray);");
pigServer.registerQuery("C = group A by 1;");
pigServer.registerQuery("D = foreach C generate SUM(A.id), 1+SUM(A.id)+SUM(A.id);");
@@ -564,7 +579,7 @@ public class TestAccumulator {
pigServer.registerQuery("C = group A by id;");
pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);");
- Iterator<Tuple> iter = pigServer.openIterator("D");
+ pigServer.openIterator("D");
}
/**
@@ -615,24 +630,25 @@ public class TestAccumulator {
@Test
public void testAccumulatorOff() throws IOException{
- pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "false");
+ pigServer.getPigContext().getProperties().setProperty(
+ PigConfiguration.OPT_ACCUMULATOR, "false");
pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
- pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulativeSumBag(A);");
+ pigServer.registerQuery("C = foreach B generate group," +
+ "org.apache.pig.test.utils.AccumulativeSumBag(A);");
checkAccumulatorOff("C");
- pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "true");
+ pigServer.getPigContext().getProperties().setProperty(
+ PigConfiguration.OPT_ACCUMULATOR, "true");
}
private void checkAccumulatorOff(String alias) {
try {
Iterator<Tuple> iter = pigServer.openIterator(alias);
- int c = 0;
while(iter.hasNext()) {
iter.next();
- c++;
}
fail("Accumulator should be off.");
}catch(Exception e) {
@@ -644,7 +660,8 @@ public class TestAccumulator {
public void testAccumWithMap() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);");
pigServer.registerQuery("B = group A by (id, url);");
- pigServer.registerQuery("C = foreach B generate COUNT(A), org.apache.pig.test.utils.URLPARSE(group.url)#'url';");
+ pigServer.registerQuery("C = foreach B generate COUNT(A)," +
+ "org.apache.pig.test.utils.URLPARSE(group.url)#'url';");
HashMap<Integer, String> expected = new HashMap<Integer, String>();
expected.put(2, "http://ibm.com");
@@ -679,7 +696,8 @@ public class TestAccumulator {
pigServer.registerQuery("A = load 'data1' as (x:int, y:int);");
pigServer.registerQuery("B = load 'data2' as (x:int, z:int);");
pigServer.registerQuery("C = cogroup A by x, B by x;");
- pigServer.registerQuery("D = foreach C generate group, SUM((IsEmpty(A.y) ? {(0)} : A.y)) + SUM((IsEmpty(B.z) ? {(0)} : B.z));");
+ pigServer.registerQuery("D = foreach C generate group," +
+ "SUM((IsEmpty(A.y) ? {(0)} : A.y)) + SUM((IsEmpty(B.z) ? {(0)} : B.z));");
HashMap<Integer, Long> expected = new HashMap<Integer, Long>();
expected.put(1, 21l);
@@ -704,7 +722,7 @@ public class TestAccumulator {
@Test
public void testAccumAfterNestedOp() throws IOException, ParserException{
// test group by
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B " +
"{ o = order A by id; " +
Modified: pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java Fri May 30 19:07:23 2014
@@ -23,26 +23,35 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Iterator;
+import java.util.Properties;
import java.util.Random;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestAlgebraicEval {
+ private static PigServer pig;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
private int LOOP_COUNT = 1024;
-
- private PigServer pig;
+ private Boolean[] nullFlags = new Boolean[]{ false, true};
@Before
public void setUp() throws Exception {
- pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pig = new PigServer(cluster.getExecType(), properties);
+ }
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
}
@AfterClass
@@ -50,9 +59,6 @@ public class TestAlgebraicEval {
cluster.shutDown();
}
- Boolean[] nullFlags = new Boolean[]{ false, true};
-
- static MiniCluster cluster = MiniCluster.buildCluster();
@Test
public void testGroupCountWithMultipleFields() throws Throwable {
File tmpFile = File.createTempFile("test", "txt");
Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Fri May 30 19:07:23 2014
@@ -33,15 +33,12 @@ import java.util.Iterator;
import java.util.Properties;
import java.util.Map.Entry;
-import junit.framework.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataType;
@@ -50,34 +47,39 @@ import org.apache.pig.impl.PigContext;
import org.apache.tools.bzip2r.CBZip2InputStream;
import org.apache.tools.bzip2r.CBZip2OutputStream;
import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-@RunWith(JUnit4.class)
public class TestBZip {
- static MiniCluster cluster = MiniCluster.buildCluster();
-
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
/**
* Tests the end-to-end writing and reading of a BZip file.
*/
@Test
public void testBzipInPig() throws Exception {
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+
File in = File.createTempFile("junit", ".bz2");
in.deleteOnExit();
-
+
File out = File.createTempFile("junit", ".bz2");
out.delete();
String clusterOutput = Util.removeColon(out.getAbsolutePath());
-
- CBZip2OutputStream cos =
+
+ CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in));
for (int i = 1; i < 100; i++) {
StringBuffer sb = new StringBuffer();
@@ -86,7 +88,7 @@ public class TestBZip {
cos.write(bytes);
}
cos.close();
-
+
pig.registerQuery("AA = load '"
+ Util.generateURI(Util.encodeEscape(in.getAbsolutePath()), pig.getPigContext())
+ "';");
@@ -94,49 +96,50 @@ public class TestBZip {
pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutput) + "';");
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
- FSDataInputStream is = fs.open(new Path(clusterOutput +
- "/part-r-00000.bz2"));
+ FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutput),
+ Util.getSuccessMarkerPathFilter());
+ FSDataInputStream is = fs.open(outputFiles[0].getPath());
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
-
+
// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
assertEquals(100, cis.read(new byte[100]));
cis.close();
-
+
pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutput) + "';");
-
+
Iterator<Tuple> i = pig.openIterator("B");
HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
while (i.hasNext()) {
Integer val = DataType.toInteger(i.next().get(0));
- map.put(val, val);
+ map.put(val, val);
}
-
+
assertEquals(new Integer(99), new Integer(map.keySet().size()));
-
+
for (int j = 1; j < 100; j++) {
assertEquals(new Integer(j), map.get(j));
}
-
+
in.delete();
Util.deleteFile(cluster, clusterOutput);
}
-
+
/**
* Tests the end-to-end writing and reading of a BZip file using absolute path with a trailing /.
*/
@Test
public void testBzipInPig2() throws Exception {
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+
File in = File.createTempFile("junit", ".bz2");
in.deleteOnExit();
-
+
File out = File.createTempFile("junit", ".bz2");
out.delete();
String clusterOutput = Util.removeColon(out.getAbsolutePath());
-
- CBZip2OutputStream cos =
+
+ CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in));
for (int i = 1; i < 100; i++) {
StringBuffer sb = new StringBuffer();
@@ -145,7 +148,7 @@ public class TestBZip {
cos.write(bytes);
}
cos.close();
-
+
pig.registerQuery("AA = load '"
+ Util.generateURI(in.getAbsolutePath(), pig.getPigContext())
+ "';");
@@ -153,30 +156,31 @@ public class TestBZip {
pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutput) + "/';");
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
- FSDataInputStream is = fs.open(new Path(clusterOutput +
- "/part-r-00000.bz2"));
+ FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutput),
+ Util.getSuccessMarkerPathFilter());
+ FSDataInputStream is = fs.open(outputFiles[0].getPath());
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
-
+
// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
assertEquals(100, cis.read(new byte[100]));
cis.close();
-
+
pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutput) + "';");
-
+
Iterator<Tuple> i = pig.openIterator("B");
HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
while (i.hasNext()) {
Integer val = DataType.toInteger(i.next().get(0));
- map.put(val, val);
+ map.put(val, val);
}
-
+
assertEquals(new Integer(99), new Integer(map.keySet().size()));
-
+
for (int j = 1; j < 100; j++) {
assertEquals(new Integer(j), map.get(j));
}
-
+
in.delete();
out.delete();
}
@@ -190,15 +194,15 @@ public class TestBZip {
"7\t8", // '\n' case
"9\t10\r" // '\r\n' at the end of file
};
-
+
// bzip compressed input
File in = File.createTempFile("junit", ".bz2");
String compressedInputFileName = in.getAbsolutePath();
String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
in.deleteOnExit();
-
+
try {
- CBZip2OutputStream cos =
+ CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in));
for (int i = 0; i < inputData.length; i++) {
StringBuffer sb = new StringBuffer();
@@ -207,31 +211,30 @@ public class TestBZip {
cos.write(bytes);
}
cos.close();
-
+
Util.copyFromLocalToCluster(cluster, compressedInputFileName,
- clusterCompressedFilePath);
-
+ clusterCompressedFilePath);
+
// pig script to read compressed input
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+
// pig script to read compressed input
String script ="a = load '" + Util.encodeEscape(clusterCompressedFilePath) +"';";
pig.registerQuery(script);
-
+
pig.registerQuery("store a into 'intermediate.bz';");
pig.registerQuery("b = load 'intermediate.bz';");
Iterator<Tuple> it2 = pig.openIterator("b");
- while (it2.hasNext()) {
- it2.next();
- }
+ while (it2.hasNext()) {
+ it2.next();
+ }
} finally {
in.delete();
Util.deleteFile(cluster, "intermediate.bz");
Util.deleteFile(cluster, "final.bz");
}
}
- /**
+ /**
* Tests that '\n', '\r' and '\r\n' are treated as record delims when using
* bzip just like they are when using uncompressed text
*/
@@ -243,7 +246,7 @@ public class TestBZip {
"7\t8", // '\n' case
"9\t10\r" // '\r\n' at the end of file
};
-
+
// bzip compressed input
File in = File.createTempFile("junit", ".bz2");
String compressedInputFileName = in.getAbsolutePath();
@@ -252,9 +255,9 @@ public class TestBZip {
String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
Util.createInputFile(cluster, unCompressedInputFileName, inputData);
-
+
try {
- CBZip2OutputStream cos =
+ CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in));
for (int i = 0; i < inputData.length; i++) {
StringBuffer sb = new StringBuffer();
@@ -263,58 +266,56 @@ public class TestBZip {
cos.write(bytes);
}
cos.close();
-
+
Util.copyFromLocalToCluster(cluster, compressedInputFileName,
clusterCompressedFilePath);
-
+
// pig script to read uncompressed input
String script = "a = load '" + unCompressedInputFileName +"';";
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery(script);
Iterator<Tuple> it1 = pig.openIterator("a");
-
+
// pig script to read compressed input
script = "a = load '" + Util.encodeEscape(clusterCompressedFilePath) +"';";
pig.registerQuery(script);
Iterator<Tuple> it2 = pig.openIterator("a");
-
+
while(it1.hasNext()) {
Tuple t1 = it1.next();
Tuple t2 = it2.next();
- Assert.assertEquals(t1, t2);
+ assertEquals(t1, t2);
}
-
- Assert.assertFalse(it2.hasNext());
-
+
+ assertFalse(it2.hasNext());
+
} finally {
in.delete();
Util.deleteFile(cluster, unCompressedInputFileName);
Util.deleteFile(cluster, clusterCompressedFilePath);
}
-
+
}
-
+
/**
* Tests the end-to-end writing and reading of an empty BZip file.
*/
@Test
public void testEmptyBzipInPig() throws Exception {
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+
File in = File.createTempFile("junit", ".tmp");
in.deleteOnExit();
File out = File.createTempFile("junit", ".bz2");
out.delete();
String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath());
-
+
FileOutputStream fos = new FileOutputStream(in);
fos.write("55\n".getBytes());
fos.close();
System.out.println(in.getAbsolutePath());
-
+
pig.registerQuery("AA = load '"
+ Util.generateURI(Util.encodeEscape(in.getAbsolutePath()), pig.getPigContext())
+ "';");
@@ -322,21 +323,22 @@ public class TestBZip {
pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutputFilePath) + "';");
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
- FSDataInputStream is = fs.open(new Path(clusterOutputFilePath +
- "/part-r-00000.bz2"));
+ FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutputFilePath),
+ Util.getSuccessMarkerPathFilter());
+ FSDataInputStream is = fs.open(outputFiles[0].getPath());
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
-
+
// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
assertEquals(-1, cis.read(new byte[100]));
cis.close();
-
+
pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutputFilePath) + "';");
pig.openIterator("B");
-
+
in.delete();
Util.deleteFile(cluster, clusterOutputFilePath);
-
+
}
/**
@@ -357,7 +359,7 @@ public class TestBZip {
cis.close();
tmp.delete();
}
-
+
/**
* Tests the case where a bzip block ends exactly at the end of the {@link InputSplit}
* with the block header ending a few bits into the last byte of current
@@ -371,11 +373,11 @@ public class TestBZip {
// test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
// In this test we will load test/org/apache/pig/test/data/bzipdir1.bz2 to also
// test that the BZip2TextInputFormat can read subdirs recursively
- String inputFileName =
+ String inputFileName =
"test/org/apache/pig/test/data/bzipdir1.bz2";
Long expectedCount = 74999L; // number of lines in above file
- // the first block in the above file exactly ends a few bits into the
- // byte at position 136500
+ // the first block in the above file exactly ends a few bits into the
+ // byte at position 136500
int splitSize = 136500;
try {
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
@@ -385,21 +387,21 @@ public class TestBZip {
Util.deleteFile(cluster, inputFileName);
}
}
-
+
/**
- * Tests the case where a bzip block ends exactly at the end of the input
+ * Tests the case where a bzip block ends exactly at the end of the input
* split (byte aligned with the last byte) and the last byte is a carriage
* return.
*/
@Test
public void testBlockHeaderEndingWithCR() throws IOException {
- String inputFileName =
+ String inputFileName =
"test/org/apache/pig/test/data/blockEndingInCR.txt.bz2";
// number of lines in above file (the value is 1 more than bzcat | wc -l
// since there is a '\r' which is also treated as a record delim
- Long expectedCount = 82094L;
- // the first block in the above file exactly ends at the byte at
- // position 136498 and the last byte is a carriage return ('\r')
+ Long expectedCount = 82094L;
+ // the first block in the above file exactly ends at the byte at
+ // position 136498 and the last byte is a carriage return ('\r')
try {
int splitSize = 136498;
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
@@ -408,21 +410,21 @@ public class TestBZip {
Util.deleteFile(cluster, inputFileName);
}
}
-
+
/**
* Tests the case where a bzip block ends exactly at the end of the input
* split and has more data which results in overcounting (record duplication)
* in Pig 0.6
- *
+ *
*/
@Test
public void testBlockHeaderEndingAtSplitOverCounting() throws IOException {
-
- String inputFileName =
+
+ String inputFileName =
"test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2";
Long expectedCount = 1041046L; // number of lines in above file
- // the first block in the above file exactly ends a few bits into the
- // byte at position 136500
+ // the first block in the above file exactly ends a few bits into the
+ // byte at position 136500
int splitSize = 136500;
try {
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
@@ -431,29 +433,28 @@ public class TestBZip {
Util.deleteFile(cluster, inputFileName);
}
}
-
- private void testCount(String inputFileName, Long expectedCount,
+
+ private void testCount(String inputFileName, Long expectedCount,
int splitSize, String loadFuncSpec) throws IOException {
String outputFile = "/tmp/bz-output";
// simple load-store script to verify that the bzip input is getting
// split
String scriptToTestSplitting = "a = load '" +inputFileName + "' using " +
loadFuncSpec + "; store a into '" + outputFile + "';";
-
+
String script = "a = load '" + inputFileName + "';" +
- "b = group a all;" +
- "c = foreach b generate COUNT_STAR(a);";
+ "b = group a all;" +
+ "c = foreach b generate COUNT_STAR(a);";
Properties props = new Properties();
- for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+ for (Entry<Object, Object> entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
props.setProperty("mapred.max.split.size", Integer.toString(splitSize));
- PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
- PigServer pig = new PigServer(pigContext);
+ PigServer pig = new PigServer(cluster.getExecType(), props);
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(props));
fs.delete(new Path(outputFile), true);
Util.registerMultiLineQuery(pig, scriptToTestSplitting);
-
+
// verify that > 1 maps were launched due to splitting of the bzip input
FileStatus[] files = fs.listStatus(new Path(outputFile));
int numPartFiles = 0;
@@ -463,40 +464,41 @@ public class TestBZip {
}
}
assertEquals(true, numPartFiles > 0);
-
+
// verify record count to verify we read bzip data correctly
Util.registerMultiLineQuery(pig, script);
Iterator<Tuple> it = pig.openIterator("c");
Long result = (Long) it.next().get(0);
assertEquals(expectedCount, result);
-
+
}
-
+
@Test
public void testBzipStoreInMultiQuery() throws Exception {
String[] inputData = new String[] {
"1\t2\r3\t4"
};
-
+
String inputFileName = "input.txt";
Util.createInputFile(cluster, inputFileName, inputData);
-
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+
pig.setBatchOn();
pig.registerQuery("a = load '" + inputFileName + "';");
pig.registerQuery("store a into 'output.bz2';");
pig.registerQuery("store a into 'output';");
pig.executeBatch();
-
+
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
- FileStatus stat = fs.getFileStatus(new Path("output/part-m-00000"));
- assertTrue(stat.getLen() > 0);
-
- stat = fs.getFileStatus(new Path("output.bz2/part-m-00000.bz2"));
- assertTrue(stat.getLen() > 0);
+ FileStatus[] outputFiles = fs.listStatus(new Path("output"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+
+ outputFiles = fs.listStatus(new Path("output.bz2"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
}
@Test
@@ -504,34 +506,35 @@ public class TestBZip {
String[] inputData = new String[] {
"1\t2\r3\t4"
};
-
+
String inputFileName = "input2.txt";
Util.createInputFile(cluster, inputFileName, inputData);
-
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
+
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
PigContext pigContext = pig.getPigContext();
pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" );
-
+
pig.setBatchOn();
pig.registerQuery("a = load '" + inputFileName + "';");
pig.registerQuery("store a into 'output2.bz2';");
pig.registerQuery("store a into 'output2';");
pig.executeBatch();
-
+
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
- FileStatus stat = fs.getFileStatus(new Path("output2/part-m-00000.bz2"));
- assertTrue(stat.getLen() > 0);
-
- stat = fs.getFileStatus(new Path("output2.bz2/part-m-00000.bz2"));
- assertTrue(stat.getLen() > 0);
+ FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+
+ outputFiles = fs.listStatus(new Path("output2.bz2"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
}
-
- /**
+
+ /**
* Tests that Pig throws an Exception when the input files to be loaded are actually
- * a result of concatenating 2 or more bz2 files. Pig should not silently ignore part
+ * a result of concatenating 2 or more bz2 files. Pig should not silently ignore part
* of the input data.
*/
@Test (expected=IOException.class)
@@ -550,12 +553,12 @@ public class TestBZip {
"1\tb",
"2\tbb"
};
-
+
// bzip compressed input file1
File in1 = File.createTempFile("junit", ".bz2");
String compressedInputFileName1 = in1.getAbsolutePath();
in1.deleteOnExit();
-
+
// file2
File in2 = File.createTempFile("junit", ".bz2");
String compressedInputFileName2 = in2.getAbsolutePath();
@@ -563,9 +566,9 @@ public class TestBZip {
String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged);
-
+
try {
- CBZip2OutputStream cos =
+ CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in1));
for (int i = 0; i < inputData1.length; i++) {
StringBuffer sb = new StringBuffer();
@@ -574,8 +577,8 @@ public class TestBZip {
cos.write(bytes);
}
cos.close();
-
- CBZip2OutputStream cos2 =
+
+ CBZip2OutputStream cos2 =
new CBZip2OutputStream(new FileOutputStream(in2));
for (int i = 0; i < inputData2.length; i++) {
StringBuffer sb = new StringBuffer();
@@ -589,56 +592,55 @@ public class TestBZip {
catInto(compressedInputFileName2, compressedInputFileName1);
Util.copyFromLocalToCluster(cluster, compressedInputFileName1,
compressedInputFileName1);
-
+
// pig script to read uncompressed input
String script = "a = load '" + Util.encodeEscape(unCompressedInputFileName) +"';";
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery(script);
Iterator<Tuple> it1 = pig.openIterator("a");
-
+
// pig script to read compressed concatenated input
script = "a = load '" + Util.encodeEscape(compressedInputFileName1) +"';";
pig.registerQuery(script);
Iterator<Tuple> it2 = pig.openIterator("a");
-
+
while(it1.hasNext()) {
Tuple t1 = it1.next();
Tuple t2 = it2.next();
- Assert.assertEquals(t1, t2);
+ assertEquals(t1, t2);
}
-
- Assert.assertFalse(it2.hasNext());
-
+
+ assertFalse(it2.hasNext());
+
} finally {
in1.delete();
in2.delete();
Util.deleteFile(cluster, unCompressedInputFileName);
}
-
+
}
-
+
/*
* Concatenate the contents of src file to the contents of dest file
*/
private void catInto(String src, String dest) throws IOException {
- BufferedWriter out = new BufferedWriter(new FileWriter(dest, true));
- BufferedReader in = new BufferedReader(new FileReader(src));
- String str;
- while ((str = in.readLine()) != null) {
- out.write(str);
- }
- in.close();
- out.close();
+ BufferedWriter out = new BufferedWriter(new FileWriter(dest, true));
+ BufferedReader in = new BufferedReader(new FileReader(src));
+ String str;
+ while ((str = in.readLine()) != null) {
+ out.write(str);
+ }
+ in.close();
+ out.close();
}
-
+
// See PIG-1714
@Test
public void testBzipStoreInMultiQuery3() throws Exception {
String[] inputData = new String[] {
"1\t2\r3\t4"
};
-
+
String inputFileName = "input3.txt";
Util.createInputFile(cluster, inputFileName, inputData);
@@ -649,25 +651,27 @@ public class TestBZip {
"a = load '" + inputFileName + "';\n" +
"store a into 'output3.bz2';\n" +
"store a into 'output3';";
-
+
String inputScriptName = "script3.txt";
PrintWriter pw = new PrintWriter(new FileWriter(inputScriptName));
pw.println(inputScript);
pw.close();
-
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+
FileInputStream fis = new FileInputStream(inputScriptName);
pig.registerScript(fis);
-
+
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
- FileStatus stat = fs.getFileStatus(new Path("output3/part-m-00000.bz2"));
- assertTrue(stat.getLen() > 0);
-
- stat = fs.getFileStatus(new Path("output3.bz2/part-m-00000.bz2"));
- assertTrue(stat.getLen() > 0);
+ FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+
+ outputFiles = fs.listStatus(new Path("output3.bz2"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
}
-
+
}
+
Modified: pig/trunk/test/org/apache/pig/test/TestBatchAliases.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBatchAliases.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBatchAliases.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBatchAliases.java Fri May 30 19:07:23 2014
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Properties;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.impl.io.FileLocalizer;
@@ -37,7 +38,7 @@ public class TestBatchAliases {
@Before
public void setUp() throws Exception {
- System.setProperty("opt.multiquery", ""+true);
+ System.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
myPig = new PigServer(ExecType.LOCAL, new Properties());
deleteOutputFiles();
}
Modified: pig/trunk/test/org/apache/pig/test/TestBestFitCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBestFitCast.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBestFitCast.java Fri May 30 19:07:23 2014
@@ -17,16 +17,15 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
@@ -40,17 +39,20 @@ import org.apache.pig.impl.util.LogUtils
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestBestFitCast {
- private PigServer pigServer;
- private static MiniCluster cluster = MiniCluster.buildCluster();
- String inputFile, inputFile2;
- int LOOP_SIZE = 20;
+ private static PigServer pigServer;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
+ private String inputFile, inputFile2;
+ private int LOOP_SIZE = 20;
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), properties);
inputFile = "TestBestFitCast-input.txt";
String[] input = new String[LOOP_SIZE];
long l = 0;
@@ -73,6 +75,12 @@ public class TestBestFitCast {
Util.deleteFile(cluster, inputFile2);
}
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
@@ -233,7 +241,7 @@ public class TestBestFitCast {
try {
pigServer.registerQuery("A = LOAD '" + inputFile + "' as (x:float, y);");
pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName() + "(x,y);");
- Iterator<Tuple> iter = pigServer.openIterator("B");
+ pigServer.openIterator("B");
} catch (Exception e) {
exceptionCaused = true;
PigException pe = LogUtils.getPigException(e);
@@ -319,7 +327,7 @@ public class TestBestFitCast {
pigServer.registerQuery("A = LOAD '" + inputFile + "' as (x, y:int);");
pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName()
+ "(x,y, y);");
- Iterator<Tuple> iter = pigServer.openIterator("B");
+ pigServer.openIterator("B");
} catch (Exception e) {
exceptionCaused = true;
PigException pe = LogUtils.getPigException(e);
@@ -341,7 +349,7 @@ public class TestBestFitCast {
pigServer.registerQuery("A = LOAD '" + inputFile + "' as (x, y:long);");
pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName()
+ "(x,y, y);");
- Iterator<Tuple> iter = pigServer.openIterator("B");
+ pigServer.openIterator("B");
} catch (Exception e) {
exceptionCaused = true;
PigException pe = LogUtils.getPigException(e);
@@ -363,7 +371,7 @@ public class TestBestFitCast {
pigServer.registerQuery("A = LOAD '" + inputFile + "' as (x, y:double);");
pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName()
+ "(x,y, y);");
- Iterator<Tuple> iter = pigServer.openIterator("B");
+ pigServer.openIterator("B");
} catch (Exception e) {
exceptionCaused = true;
PigException pe = LogUtils.getPigException(e);
@@ -476,7 +484,7 @@ public class TestBestFitCast {
pigServer.registerQuery("A = LOAD '" + inputFile2 + "' as (x:float, y, z:int);");
pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName()
+ "(x,y, y);");
- Iterator<Tuple> iter = pigServer.openIterator("B");
+ pigServer.openIterator("B");
} catch (Exception e) {
exceptionCaused = true;
PigException pe = LogUtils.getPigException(e);
Modified: pig/trunk/test/org/apache/pig/test/TestBinaryExpressionOps.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBinaryExpressionOps.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBinaryExpressionOps.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBinaryExpressionOps.java Fri May 30 19:07:23 2014
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,11 +19,10 @@
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.junit.AfterClass;
@@ -32,12 +30,11 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class TestBinaryExpressionOps {
-
- private static MiniCluster cluster = MiniCluster.buildCluster();
-
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
private static final String INPUT_1 = "input1";
private static final String INPUT_2 = "input2";
-
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
String[] inputData1 = new String[] {
@@ -46,34 +43,39 @@ public class TestBinaryExpressionOps {
String[] inputData2 = new String[] {
"id1\t2", "id2\t2"
};
- Util.createInputFile(cluster, INPUT_1, inputData1);
+ Util.createInputFile(cluster, INPUT_1, inputData1);
Util.createInputFile(cluster, INPUT_2, inputData2);
}
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
cluster.shutDown();
}
-
+
@Test
public void testArithmeticOperators() throws Exception {
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+
pig.registerQuery("A = LOAD '" + INPUT_1 + "' AS (id:chararray, val:long);");
pig.registerQuery("B = LOAD '" + INPUT_2 + "' AS (id:chararray, val:long);");
pig.registerQuery("C = COGROUP A BY id, B BY id;");
pig.registerQuery("D = FOREACH C GENERATE group, SUM(B.val), SUM(A.val), "
- + "(SUM(A.val) - SUM(B.val)), (SUM(A.val) + SUM(B.val)), "
- + "(SUM(A.val) * SUM(B.val)), (SUM(A.val) / SUM(B.val)), "
+ + "(SUM(A.val) - SUM(B.val)), (SUM(A.val) + SUM(B.val)), "
+ + "(SUM(A.val) * SUM(B.val)), (SUM(A.val) / SUM(B.val)), "
+ "(SUM(A.val) % SUM(B.val)), (SUM(A.val) < 0 ? SUM(A.val) : SUM(B.val));");
-
- String[] expectedResults = new String[] {"(id1,2,,,,,,,)", "(id2,2,10,8,12,20,5,0,2)"};
- Iterator<Tuple> iter = pig.openIterator("D");
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults[counter++], iter.next().toString());
- }
- assertEquals(expectedResults.length, counter);
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('id1',2L,null,null,null,null,null,null,null)",
+ "('id2',2L,10L,8L,12L,20L,5L,0L,2L)" });
+ Iterator<Tuple> iter = pig.openIterator("D");
+ Util.checkQueryOutputsAfterSort(iter, expectedResults);
}
-
+
}
Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri May 30 19:07:23 2014
@@ -121,7 +121,6 @@ import org.apache.pig.data.DefaultBagFac
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -132,17 +131,16 @@ import org.joda.time.DateTimeZone;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestBuiltin {
+ private static PigServer pigServer;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
- PigServer pigServer;
-
- // This should only be used when absolutely necessary -- eg, when using ReadToEndLoader.
- private static MiniCluster cluster = MiniCluster.buildCluster();
-
- TupleFactory tupleFactory = TupleFactory.getInstance();
- BagFactory bagFactory = DefaultBagFactory.getInstance();
+ private TupleFactory tupleFactory = TupleFactory.getInstance();
+ private BagFactory bagFactory = DefaultBagFactory.getInstance();
// some inputs
private static Integer[] intInput = { 3, 1, 2, 4, 5, 7, null, 6, 8, 9, 10 };
@@ -203,10 +201,7 @@ public class TestBuiltin {
@Before
public void setUp() throws Exception {
- // re initialize FileLocalizer so that each test will run correctly
- // without any side effect of other tests - this is needed since some
- // tests are in mapred and some in local mode.
- FileLocalizer.setInitialized(false);
+ Util.resetStateForExecModeSwitch();
pigServer = new PigServer(ExecType.LOCAL, new Properties());
pigServer.setValidateEachStatement(true);
@@ -347,14 +342,17 @@ public class TestBuiltin {
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null)));
}
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
@AfterClass
public static void shutDown() {
cluster.shutDown();
}
- /**
- *
- */
private void setupEvalFuncMap() {
for (String[] aggGroup : aggs) {
for (String agg : aggGroup) {
@@ -2114,13 +2112,6 @@ public class TestBuiltin {
assertEquals(0.582222509739582, (Double)ans.get(2) ,0.0005);
}
- private void checkItemsGT(Iterable<Tuple> tuples, int field, int limit) throws ExecException {
- for (Tuple t : tuples) {
- Long val = (Long) t.get(field);
- assertTrue("Value "+ val + " exceeded the expected limit", val > limit);
- }
- }
-
@Test
public void testToBag() throws Exception {
//TEST TOBAG
@@ -2599,7 +2590,7 @@ public class TestBuiltin {
String input3 = "this:has:a:trailing:colon:\n";
int arity3 = 6;
- Util.createInputFile(cluster, "input.txt", new String[] {input2});
+ Util.createInputFile(cluster, "input.txt", new String[] {input3});
LoadFunc p3 = new ReadToEndLoader(new PigStorage(":"), ConfigurationUtil.
toConfiguration(cluster.getProperties()), "input.txt", 0);
Tuple f3 = p3.getNext();
@@ -2637,10 +2628,10 @@ public class TestBuiltin {
assertTrue(f3 == null);
}
- @SuppressWarnings("unchecked")
@Test
public void testSFPig() throws Exception {
- PigServer mrPigServer = new PigServer(ExecType.MAPREDUCE);
+ Util.resetStateForExecModeSwitch();
+ PigServer mrPigServer = new PigServer(cluster.getExecType(), properties);
String inputStr = "amy\tbob\tcharlene\tdavid\terin\tfrank";
Util.createInputFile(cluster, "testSFPig-input.txt", new String[]
{inputStr});
@@ -2670,7 +2661,6 @@ public class TestBuiltin {
* unit tests are done in TestStringUDFs
*/
@Test
- @SuppressWarnings("unchecked")
public void testStringUDFs() throws Exception {
String inputStr = "amy smith ";
File inputFile = Util.createInputFile("tmp", "testStrUDFsIn.txt", new String[] {inputStr});
Modified: pig/trunk/test/org/apache/pig/test/TestCharArrayToNumeric.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCharArrayToNumeric.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCharArrayToNumeric.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCharArrayToNumeric.java Fri May 30 19:07:23 2014
@@ -25,9 +25,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -40,13 +40,14 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.impl.plan.OperatorKey;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestCharArrayToNumeric {
- private Double dummyDouble = null;
- private Float dummyFloat = null;
- private Long dummyLong = null;
- private Integer dummyInteger = null;
+ private static PigServer pig;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
private Double MaxDouble = Double.MIN_VALUE;
private Double MinDouble = Double.MIN_VALUE;
private Float MaxFloat = Float.MAX_VALUE;
@@ -56,12 +57,15 @@ public class TestCharArrayToNumeric {
private Integer MaxInteger = Integer.MAX_VALUE;
private Integer MinInteger = Integer.MIN_VALUE;
- static MiniCluster cluster = MiniCluster.buildCluster();
- PigServer pig;
-
@Before
public void setUp() throws Exception {
- pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pig = new PigServer(cluster.getExecType(), properties);
+ }
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
}
@AfterClass
Modified: pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCombiner.java Fri May 30 19:07:23 2014
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Properties;
import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
@@ -41,27 +40,40 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestCombiner {
+ private static MiniGenericCluster cluster;
+ private static Properties properties;
- static MiniCluster cluster = MiniCluster.buildCluster();
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
+ @Before
+ public void setUp() throws Exception {
+ Util.resetStateForExecModeSwitch();
+ }
+
@Test
public void testSuccessiveUserFuncs1() throws Exception {
String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +
"c = group a by c2; " +
"f = foreach c generate COUNT(org.apache.pig.builtin.Distinct($1.$2)); " +
"store f into 'out';";
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
PigContext pc = pigServer.getPigContext();
assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
.isEmpty()));
+ pigServer.shutdown();
}
@Test
@@ -72,41 +84,27 @@ public class TestCombiner {
"f = foreach c generate COUNT(" + dummyUDF + "" +
"(org.apache.pig.builtin.Distinct($1.$2)," + dummyUDF + "())); " +
"store f into 'out';";
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
PigContext pc = pigServer.getPigContext();
assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
.isEmpty()));
+ pigServer.shutdown();
}
@Test
public void testOnCluster() throws Exception {
// run the test on cluster
- String inputFileName = runTest(new PigServer(
- ExecType.MAPREDUCE, cluster.getProperties()));
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
+ String inputFileName = runTest(pigServer);
Util.deleteFile(cluster, inputFileName);
-
- }
-
- /*
- * (non-Javadoc)
- * @see junit.framework.TestCase#setUp()
- */
- @Before
- public void setUp() throws Exception {
- // cause a re initialization of FileLocalizer's
- // internal state before each test run
- // A previous test might have been in a different
- // mode than the test which is about to run. To
- // ensure each test runs correctly in it's exectype
- // mode, let's re initialize.
- FileLocalizer.setInitialized(false);
+ pigServer.shutdown();
}
@Test
public void testLocal() throws Exception {
// run the test locally
FileLocalizer.deleteTempFiles();
- runTest(new PigServer(ExecType.LOCAL, new Properties()));
+ runTest(new PigServer("local"));
FileLocalizer.deleteTempFiles();
}
@@ -133,7 +131,7 @@ public class TestCombiner {
File inputFile = File.createTempFile("test", "txt");
inputFile.deleteOnExit();
String inputFileName = inputFile.getAbsolutePath();
- if (pig.getPigContext().getExecType() == ExecType.LOCAL) {
+ if (pig.getPigContext().getExecType().isLocal()) {
PrintStream ps = new PrintStream(new FileOutputStream(inputFile));
for (String line : inputLines) {
ps.println(line);
@@ -171,9 +169,11 @@ public class TestCombiner {
}
}
Util.createInputFile(cluster, "MultiCombinerUseInput.txt", input);
- Properties props = cluster.getProperties();
- props.setProperty("io.sort.mb", "1");
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, props);
+ String oldValue = properties.getProperty("io.sort.mb");
+ properties.setProperty("io.sort.mb", "1");
+
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
+ pigServer.getPigContext().getProperties().setProperty("mapred.child.java.opts", "-Xmx1024m");
pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as (x:int);");
pigServer.registerQuery("b = group a all;");
pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), " +
@@ -198,6 +198,13 @@ public class TestCombiner {
assertFalse(it.hasNext());
Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
+ // Reset io.sort.mb to the original value before exit
+ if (oldValue == null) {
+ properties.remove("io.sort.mb");
+ } else {
+ properties.setProperty("io.sort.mb", oldValue);
+ }
+ pigServer.shutdown();
}
@Test
@@ -213,7 +220,7 @@ public class TestCombiner {
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "distinctAggs1Input.txt", input);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'distinctAggs1Input.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
@@ -237,14 +244,14 @@ public class TestCombiner {
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
- Object[] expected = results.get((String)fields.get(0));
+ Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
assertEquals(expected[i++], field);
}
}
Util.deleteFile(cluster, "distinctAggs1Input.txt");
-
+ pigServer.shutdown();
}
@Test
@@ -260,7 +267,7 @@ public class TestCombiner {
};
Util.createInputFile(cluster, "testGroupElements.txt", input);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'testGroupElements.txt' as (str:chararray, num1:int, alph : chararray, num2 : int);");
pigServer.registerQuery("b = group a by (str, num1);");
@@ -305,7 +312,7 @@ public class TestCombiner {
Util.checkQueryOutputsAfterSort(it, expectedRes);
Util.deleteFile(cluster, "distinctAggs1Input.txt");
-
+ pigServer.shutdown();
}
@Test
@@ -321,7 +328,7 @@ public class TestCombiner {
};
Util.createInputFile(cluster, "testGroupLimit.txt", input);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'testGroupLimit.txt' using PigStorage(' ') " +
"as (str:chararray, num1:int) ;");
pigServer.registerQuery("b = group a by str;");
@@ -341,7 +348,7 @@ public class TestCombiner {
Iterator<Tuple> it = pigServer.openIterator("d");
Util.checkQueryOutputsAfterSort(it, expectedRes);
-
+ pigServer.shutdown();
}
private void checkCombinerUsed(PigServer pigServer, String string, boolean combineExpected)
@@ -370,7 +377,7 @@ public class TestCombiner {
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "distinctNoCombinerInput.txt", input);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'distinctNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
@@ -391,7 +398,7 @@ public class TestCombiner {
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
- Object[] expected = results.get((String)fields.get(0));
+ Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
if (i == 1) {
@@ -403,7 +410,7 @@ public class TestCombiner {
}
}
Util.deleteFile(cluster, "distinctNoCombinerInput.txt");
-
+ pigServer.shutdown();
}
@Test
@@ -421,7 +428,7 @@ public class TestCombiner {
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
@@ -442,7 +449,7 @@ public class TestCombiner {
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
- Object[] expected = results.get((String)fields.get(0));
+ Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
if (i == 1) {
@@ -454,7 +461,7 @@ public class TestCombiner {
}
}
Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
-
+ pigServer.shutdown();
}
@Test
@@ -480,7 +487,7 @@ public class TestCombiner {
try {
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.age), a;");
@@ -494,6 +501,7 @@ public class TestCombiner {
Iterator<Tuple> it = pigServer.openIterator("c");
Util.checkQueryOutputsAfterSortRecursive(it, expected,
"group:chararray,age:long,b:{t:(name:chararray,age:int,gpa:double)}");
+ pigServer.shutdown();
} finally {
Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
}
@@ -501,6 +509,7 @@ public class TestCombiner {
public static class JiraPig1030 extends EvalFunc<DataBag> {
+ @Override
public DataBag exec(Tuple input) throws IOException {
return new DefaultDataBag();
}
@@ -524,7 +533,7 @@ public class TestCombiner {
try {
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a all;");
pigServer.registerQuery("c = foreach b {" +
@@ -536,6 +545,7 @@ public class TestCombiner {
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+ pigServer.shutdown();
} finally {
Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
}
Modified: pig/trunk/test/org/apache/pig/test/TestCompressedFiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCompressedFiles.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCompressedFiles.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCompressedFiles.java Fri May 30 19:07:23 2014
@@ -24,10 +24,10 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.FileOutputStream;
import java.util.Iterator;
+import java.util.Properties;
import java.util.Random;
import java.util.zip.GZIPOutputStream;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.DIFF;
import org.apache.pig.data.BagFactory;
@@ -37,13 +37,16 @@ import org.apache.pig.test.utils.TestHel
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestCompressedFiles {
- static MiniCluster cluster = MiniCluster.buildCluster();
+ private static PigServer pig;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
- File datFile;
- File gzFile;
+ private File datFile;
+ private File gzFile;
@Before
public void setUp() throws Exception {
@@ -74,6 +77,12 @@ public class TestCompressedFiles {
gzFile.delete();
}
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
@@ -81,7 +90,7 @@ public class TestCompressedFiles {
@Test
public void testCompressed1() throws Throwable {
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("A = foreach (cogroup (load '"
+ Util.generateURI(gzFile.toString(), pig.getPigContext())
+ "') by $1, (load '"
@@ -94,7 +103,7 @@ public class TestCompressedFiles {
@Test
public void testCompressed2() throws Throwable {
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("A = load '"
+ Util.generateURI(gzFile.toString(), pig.getPigContext())
+ "';");