You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/05/08 22:50:06 UTC
svn commit: r1593413 [2/2] - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/tez/ test/
test/org/apache/pig/test/
Modified: pig/branches/tez/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestStreaming.java?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestStreaming.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestStreaming.java Thu May 8 20:50:06 2014
@@ -17,6 +17,8 @@
*/
package org.apache.pig.test;
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -24,7 +26,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.PigToStream;
import org.apache.pig.backend.executionengine.ExecException;
@@ -41,33 +42,31 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
-
public class TestStreaming {
- private static final MiniCluster cluster = MiniCluster.buildCluster();
-
+ private static final MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
private PigServer pigServer;
-
+
@Before
public void setup() throws ExecException {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
}
-
+
@After
public void tearDown() {
pigServer = null;
}
-
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
private TupleFactory tf = TupleFactory.getInstance();
private static final String simpleEchoStreamingCommand;
-
+
static {
String quote = "'";
if (Util.WINDOWS) {
@@ -79,20 +78,20 @@ public class TestStreaming {
private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException {
Assert.assertEquals(firstField.length, secondField.length);
-
+
Tuple[] expectedResults = new Tuple[firstField.length];
for (int i=0; i < expectedResults.length; ++i) {
expectedResults[i] = tf.newTuple(2);
expectedResults[i].set(0, firstField[i]);
expectedResults[i].set(1, secondField[i]);
}
-
+
return expectedResults;
}
-
+
@Test
public void testSimpleMapSideStreaming() throws Exception {
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"A,1", "B,2", "C,3", "D,2",
"A,5", "B,5", "C,8", "A,8",
"D,8", "A,9"});
@@ -104,14 +103,14 @@ public class TestStreaming {
for (int i = 0; i < withTypes.length; i++) {
Tuple[] expectedResults = null;
if(withTypes[i] == true) {
- expectedResults =
+ expectedResults =
setupExpectedResults(expectedFirstFields, expectedSecondFields);
} else {
- expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
}
-
+
// Pig query to run
pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
@@ -127,16 +126,16 @@ public class TestStreaming {
pigServer.registerQuery("OP = stream S1 through `" +
simpleEchoStreamingCommand + "`;");
}
-
+
// Run the query and check the results
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
}
-
+
@Test
- public void testSimpleMapSideStreamingWithOutputSchema()
+ public void testSimpleMapSideStreamingWithOutputSchema()
throws Exception {
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"A,1", "B,2", "C,3", "D,2",
"A,5", "B,5", "C,8", "A,8",
"D,8", "A,9"});
@@ -144,16 +143,16 @@ public class TestStreaming {
// Expected results
Object[] expectedFirstFields = new String[] {"C", "A", "D", "A"};
Object[] expectedSecondFields = new Integer[] {8, 8, 8, 9};
-
+
boolean[] withTypes = {true, false};
for (int i = 0; i < withTypes.length; i++) {
Tuple[] expectedResults = null;
if(withTypes[i] == true) {
- expectedResults =
+ expectedResults =
setupExpectedResults(expectedFirstFields, expectedSecondFields);
} else {
- expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
}
// Pig query to run
@@ -170,16 +169,16 @@ public class TestStreaming {
simpleEchoStreamingCommand + "` as (f0, f1);");
}
pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > 6;");
-
+
// Run the query and check the results
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
}
@Test
- public void testSimpleReduceSideStreamingAfterFlatten()
+ public void testSimpleReduceSideStreamingAfterFlatten()
throws Exception {
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"A,1", "B,2", "C,3", "D,2",
"A,5", "B,5", "C,8", "A,8",
"D,8", "A,9"});
@@ -191,11 +190,11 @@ public class TestStreaming {
for (int i = 0; i < withTypes.length; i++) {
Tuple[] expectedResults = null;
if(withTypes[i] == true) {
- expectedResults =
+ expectedResults =
setupExpectedResults(expectedFirstFields, expectedSecondFields);
} else {
- expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
}
@@ -217,7 +216,7 @@ public class TestStreaming {
pigServer.registerQuery("OP = stream S1 through `" +
simpleEchoStreamingCommand + "`;");
}
-
+
// Run the query and check the results
Util.checkQueryOutputsAfterSort(pigServer.openIterator("OP"), expectedResults);
}
@@ -225,7 +224,7 @@ public class TestStreaming {
@Test
public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception {
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"A,1,2,3", "B,2,4,5",
"C,3,1,2", "D,2,5,2",
"A,5,5,1", "B,5,7,4",
@@ -234,7 +233,7 @@ public class TestStreaming {
);
// Expected results
- String[] expectedFirstFields =
+ String[] expectedFirstFields =
new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"};
Integer[] expectedSecondFields = new Integer[] {1, 9, 8, 5, 2, 5, 3, 8, 2, 8};
Integer[] expectedThirdFields = new Integer[] {2, 2, 4, 5, 4, 7, 1, 9, 5, 8};
@@ -268,7 +267,7 @@ public class TestStreaming {
simpleEchoStreamingCommand + "`;");
pigServer.registerQuery("OP = stream S3 through `" +
simpleEchoStreamingCommand + "` as (f0:chararray, f1:int, f2:int, f3:int);");
-
+
// Run the query and check the results
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
@@ -276,14 +275,14 @@ public class TestStreaming {
@Test
public void testInputShipSpecs() throws Exception {
- File input = Util.createInputFile("tmp", "",
- new String[] {"A,1", "B,2", "C,3",
- "D,2", "A,5", "B,5",
- "C,8", "A,8", "D,8",
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
"A,9"});
- // Perl script
- String[] script =
+ // Perl script
+ String[] script =
new String[] {
"#!/usr/bin/perl",
"open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -295,29 +294,29 @@ public class TestStreaming {
};
File command1 = Util.createInputFile("script", "pl", script);
File command2 = Util.createInputFile("script", "pl", script);
-
+
// Expected results
- String[] expectedFirstFields =
+ String[] expectedFirstFields =
new String[] {"A", "B", "C", "A", "D", "A"};
Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
Tuple[] expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
// Pig query to run
-
+
pigServer.registerQuery(
"define CMD1 `perl " + command1.getName() + " foo` " +
"ship ('" + Util.encodeEscape(command1.toString()) + "') " +
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
"output(stdout using " + PigStreaming.class.getName() + "(',')) " +
- "stderr();");
+ "stderr();");
pigServer.registerQuery(
"define CMD2 `perl " + command2.getName() + " bar` " +
"ship ('" + Util.encodeEscape(command2.toString()) + "') " +
"input('bar' using " + PigStreaming.class.getName() + "(',')) " +
- "output(stdout using " + PigStreaming.class.getName() + "(',')) " +
- "stderr();");
+ "output(stdout using " + PigStreaming.class.getName() + "(',')) " +
+ "stderr();");
pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
pigServer.getPigContext())
@@ -326,17 +325,17 @@ public class TestStreaming {
pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
"through CMD1;");
pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
-
+
String output = "/pig/out";
pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-
+
pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');");
Iterator<Tuple> iter = pigServer.openIterator("A");
-
+
List<Tuple> outputs = new ArrayList<Tuple>();
while (iter.hasNext()) {
- outputs.add(iter.next());
+ outputs.add(iter.next());
}
// Run the query and check the results
@@ -345,14 +344,14 @@ public class TestStreaming {
@Test
public void testInputShipSpecsWithUDFDefine() throws Exception {
- File input = Util.createInputFile("tmp", "",
- new String[] {"A,1", "B,2", "C,3",
- "D,2", "A,5", "B,5",
- "C,8", "A,8", "D,8",
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
"A,9"});
- // Perl script
- String[] script =
+ // Perl script
+ String[] script =
new String[] {
"#!/usr/bin/perl",
"open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -364,13 +363,13 @@ public class TestStreaming {
};
File command1 = Util.createInputFile("script", "pl", script);
File command2 = Util.createInputFile("script", "pl", script);
-
+
// Expected results
- String[] expectedFirstFields =
+ String[] expectedFirstFields =
new String[] {"A", "B", "C", "A", "D", "A"};
Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
Tuple[] expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
// Pig query to run
@@ -381,31 +380,31 @@ public class TestStreaming {
"ship ('" + Util.encodeEscape(command1.toString()) + "') " +
"input('foo' using PS )" +
"output(stdout using PS ) " +
- "stderr();");
+ "stderr();");
pigServer.registerQuery(
"define CMD2 `perl " + command2.getName() + " bar` " +
"ship ('" + Util.encodeEscape(command2.toString()) + "') " +
"input('bar' using PS ) " +
- "output(stdout using PS ) " +
- "stderr();");
- pigServer.registerQuery("IP = load '"
+ "output(stdout using PS ) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
pigServer.getPigContext()) + "' using PigStorage(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
"through CMD1;");
pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
-
+
String output = "/pig/out";
pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-
+
pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');");
Iterator<Tuple> iter = pigServer.openIterator("A");
-
+
List<Tuple> outputs = new ArrayList<Tuple>();
while (iter.hasNext()) {
- outputs.add(iter.next());
+ outputs.add(iter.next());
}
// Run the query and check the results
@@ -413,15 +412,15 @@ public class TestStreaming {
}
@Test
- public void testInputCacheSpecs() throws Exception {
- File input = Util.createInputFile("tmp", "",
- new String[] {"A,1", "B,2", "C,3",
- "D,2", "A,5", "B,5",
- "C,8", "A,8", "D,8",
+ public void testInputCacheSpecs() throws Exception {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
"A,9"});
- // Perl script
- String[] script =
+ // Perl script
+ String[] script =
new String[] {
"#!/usr/bin/perl",
"open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -434,17 +433,17 @@ public class TestStreaming {
// Copy the scripts to HDFS
File command1 = Util.createInputFile("script", "pl", script);
File command2 = Util.createInputFile("script", "pl", script);
- String c1 = FileLocalizer.hadoopify(command1.toString(),
+ String c1 = FileLocalizer.hadoopify(command1.toString(),
pigServer.getPigContext());
- String c2 = FileLocalizer.hadoopify(command2.toString(),
+ String c2 = FileLocalizer.hadoopify(command2.toString(),
pigServer.getPigContext());
-
+
// Expected results
- String[] expectedFirstFields =
+ String[] expectedFirstFields =
new String[] {"A", "B", "C", "A", "D", "A"};
Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
- Tuple[] expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
// Pig query to run
@@ -452,12 +451,12 @@ public class TestStreaming {
"define CMD1 `perl script1.pl foo` " +
"cache ('" + c1 + "#script1.pl') " +
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
- "stderr();");
+ "stderr();");
pigServer.registerQuery(
"define CMD2 `perl script2.pl bar` " +
"cache ('" + c2 + "#script2.pl') " +
"input('bar' using " + PigStreaming.class.getName() + "(',')) " +
- "stderr();");
+ "stderr();");
pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
pigServer.getPigContext()) + "' using "
@@ -465,34 +464,34 @@ public class TestStreaming {
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
"through CMD1;");
- pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+ pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
String output = "/pig/out";
pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-
+
pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');");
Iterator<Tuple> iter = pigServer.openIterator("A");
-
+
List<Tuple> outputs = new ArrayList<Tuple>();
while (iter.hasNext()) {
- outputs.add(iter.next());
+ outputs.add(iter.next());
}
-
+
// Run the query and check the results
Util.checkQueryOutputs(outputs.iterator(), expectedResults);
}
@Test
public void testOutputShipSpecs() throws Exception {
- File input = Util.createInputFile("tmp", "",
- new String[] {"A,1", "B,2", "C,3",
- "D,2", "A,5", "B,5",
- "C,8", "A,8", "D,8",
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
"A,9"});
- // Perl script
- String[] script =
+ // Perl script
+ String[] script =
new String[] {
"#!/usr/bin/perl",
"open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
@@ -506,11 +505,11 @@ public class TestStreaming {
File command = Util.createInputFile("script", "pl", script);
// Expected results
- String[] expectedFirstFields =
+ String[] expectedFirstFields =
new String[] {"A", "A", "A", "A", "A", "A"};
Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10};
- Tuple[] expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
// Pig query to run
@@ -519,40 +518,40 @@ public class TestStreaming {
"ship ('" + Util.encodeEscape(command.toString()) + "') " +
"output('foo' using " + PigStreaming.class.getName() + "(','), " +
"'bar' using " + PigStreaming.class.getName() + "(',')) " +
- "stderr();");
- pigServer.registerQuery("IP = load '"
+ "stderr();");
+ pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
- pigServer.getPigContext()) + "' using "
+ pigServer.getPigContext()) + "' using "
+ PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
- pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
-
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
String output = "/pig/out";
pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-
+
pigServer.registerQuery("A = load '" + output + "/bar" + "' using PigStorage(',');");
Iterator<Tuple> iter = pigServer.openIterator("A");
-
+
List<Tuple> outputs = new ArrayList<Tuple>();
while (iter.hasNext()) {
- outputs.add(iter.next());
+ outputs.add(iter.next());
}
-
+
// Run the query and check the results
Util.checkQueryOutputs(outputs.iterator(), expectedResults);
}
@Test
public void testOutputShipSpecsWithUDFDefine() throws Exception {
- File input = Util.createInputFile("tmp", "",
- new String[] {"A,1", "B,2", "C,3",
- "D,2", "A,5", "B,5",
- "C,8", "A,8", "D,8",
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
"A,9"});
- // Perl script
- String[] script =
+ // Perl script
+ String[] script =
new String[] {
"#!/usr/bin/perl",
"open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
@@ -566,11 +565,11 @@ public class TestStreaming {
File command = Util.createInputFile("script", "pl", script);
// Expected results
- String[] expectedFirstFields =
+ String[] expectedFirstFields =
new String[] {"A", "A", "A", "A", "A", "A"};
Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10};
- Tuple[] expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
// Pig query to run
@@ -582,39 +581,39 @@ public class TestStreaming {
"ship ('" + Util.encodeEscape(command.toString()) + "') " +
"output('foo' using PS, " +
"'bar' using PS) " +
- "stderr();");
- pigServer.registerQuery("IP = load '"
+ "stderr();");
+ pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
pigServer.getPigContext()) + "' using PigStorage(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
- pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
-
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
String output = "/pig/out";
pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-
+
pigServer.registerQuery("A = load '" + output + "/bar" + "' using PigStorage(',');");
Iterator<Tuple> iter = pigServer.openIterator("A");
-
+
List<Tuple> outputs = new ArrayList<Tuple>();
while (iter.hasNext()) {
- outputs.add(iter.next());
+ outputs.add(iter.next());
}
-
+
// Run the query and check the results
Util.checkQueryOutputs(outputs.iterator(), expectedResults);
}
-
+
@Test
public void testInputOutputSpecs() throws Exception {
- File input = Util.createInputFile("tmp", "",
- new String[] {"A,1", "B,2", "C,3",
- "D,2", "A,5", "B,5",
- "C,8", "A,8", "D,8",
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
"A,9"});
- // Perl script
- String[] script =
+ // Perl script
+ String[] script =
new String[] {
"#!/usr/bin/perl",
"open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -630,11 +629,11 @@ public class TestStreaming {
File command = Util.createInputFile("script", "pl", script);
// Expected results
- String[] expectedFirstFields =
+ String[] expectedFirstFields =
new String[] {"A", "B", "C", "A", "D", "A"};
Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
- Tuple[] expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
// Pig query to run
pigServer.registerQuery(
@@ -643,79 +642,79 @@ public class TestStreaming {
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
"output('bar', " +
"'foobar' using " + PigStreaming.class.getName() + "(',')) " +
- "stderr();");
- pigServer.registerQuery("IP = load '"
+ "stderr();");
+ pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
- pigServer.getPigContext()) + "' using "
+ pigServer.getPigContext()) + "' using "
+ PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
- pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
-
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
String output = "/pig/out";
pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-
+
pigServer.registerQuery("A = load '" + output + "/foobar" + "' using PigStorage(',');");
Iterator<Tuple> iter = pigServer.openIterator("A");
-
+
List<Tuple> outputs = new ArrayList<Tuple>();
while (iter.hasNext()) {
- outputs.add(iter.next());
+ outputs.add(iter.next());
}
-
+
// Run the query and check the results
Util.checkQueryOutputs(outputs.iterator(), expectedResults);
-
+
// Cleanup
pigServer.deleteFile(output);
}
@Test
- public void testSimpleMapSideStreamingWithUnixPipes()
+ public void testSimpleMapSideStreamingWithUnixPipes()
throws Exception {
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"A,1", "B,2", "C,3", "D,2",
"A,5", "B,5", "C,8", "A,8",
"D,8", "A,9"});
// Expected results
- String[] expectedFirstFields =
+ String[] expectedFirstFields =
new String[] {"A", "B", "C", "D", "A", "B", "C", "A", "D", "A"};
Integer[] expectedSecondFields = new Integer[] {1, 2, 3, 2, 5, 5, 8, 8, 8, 9};
boolean[] withTypes = {true, false};
for (int i = 0; i < withTypes.length; i++) {
Tuple[] expectedResults = null;
if(withTypes[i] == true) {
- expectedResults =
+ expectedResults =
setupExpectedResults(expectedFirstFields, expectedSecondFields);
} else {
- expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
}
// Pig query to run
- pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand +
+ pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand +
" | " + simpleEchoStreamingCommand + "`;");
- pigServer.registerQuery("IP = load '"
+ pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
- pigServer.getPigContext()) + "' using "
+ pigServer.getPigContext()) + "' using "
+ PigStorage.class.getName() + "(',');");
if(withTypes[i] == true) {
pigServer.registerQuery("OP = stream IP through CMD as (f0:chararray, f1:int);");
} else {
- pigServer.registerQuery("OP = stream IP through CMD;");
+ pigServer.registerQuery("OP = stream IP through CMD;");
}
-
+
// Run the query and check the results
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
}
@Test
- public void testNegativeLoadStoreOptimization()
+ public void testNegativeLoadStoreOptimization()
throws Exception {
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"A,1", "B,2", "C,3", "D,2",
"A,5", "B,5", "C,8", "A,8",
"D,8", "A,9"});
@@ -727,20 +726,20 @@ public class TestStreaming {
for (int i = 0; i < withTypes.length; i++) {
Tuple[] expectedResults = null;
if(withTypes[i] == true) {
- expectedResults =
+ expectedResults =
setupExpectedResults(expectedFirstFields, expectedSecondFields);
} else {
- expectedResults =
- setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
Util.toDataByteArrays(expectedSecondFields));
}
// Pig query to run
- pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand +
+ pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand +
"` input(stdin using " + PigStreamDump.class.getName() + ");");
- pigServer.registerQuery("IP = load '"
+ pigServer.registerQuery("IP = load '"
+ Util.generateURI(Util.encodeEscape(input.toString()),
- pigServer.getPigContext()) + "' using "
+ pigServer.getPigContext()) + "' using "
+ PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
if(withTypes[i] == true) {
@@ -748,18 +747,18 @@ public class TestStreaming {
simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
} else {
pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
- simpleEchoStreamingCommand + "`;");
+ simpleEchoStreamingCommand + "`;");
}
-
+
// Run the query and check the results
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
}
-
+
@Test
public void testNegativeMultipleInput() throws IOException {
- // Perl script
- String[] script =
+ // Perl script
+ String[] script =
new String[] {
"#!/usr/bin/perl",
"open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -770,14 +769,14 @@ public class TestStreaming {
"}",
};
File command1 = Util.createInputFile("script", "pl", script);
- String query =
+ String query =
"define CMD1 `perl " + command1.getName() + " foo` " +
"ship ('" + Util.encodeEscape(command1.toString()) + "') " +
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
"output(stdout using " + PigStreaming.class.getName() + "(',')) " +
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
- "stderr();";
-
+ "stderr();";
+
try {
pigServer.registerQuery( query );
} catch(FrontendException ex) {
@@ -786,16 +785,16 @@ public class TestStreaming {
Assert.assertTrue( ex.getMessage().contains( expectedMsg ) );
return;
}
-
+
Assert.fail( "Testcase is supposed to fail." );
}
-
+
@Test
public void testStreamingStderrLogsShouldNotBePersistedByDefault() throws Exception {
Util.createInputFile(cluster, "mydummyinput.txt", new String[] { "dummy"});
- PigServer pig = new PigServer(ExecType.MAPREDUCE,cluster.getProperties());
+ PigServer pig = new PigServer(cluster.getExecType(),cluster.getProperties());
pig.setBatchOn();
pig.registerQuery("define mycmd `echo dummy` ;");
@@ -887,6 +886,6 @@ public class TestStreaming {
public byte[] serialize(Tuple t) throws IOException {
return (TupleFormat.format(t) + recordDelimiter).getBytes();
}
-
+
}
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestUDF.java?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestUDF.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestUDF.java Thu May 8 20:50:06 2014
@@ -55,11 +55,11 @@ public class TestUDF {
static File TempScriptFile = null;
- static MiniCluster cluster = MiniCluster.buildCluster();
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@Before
public void setUp() throws Exception {
- FileLocalizer.setInitialized(false);
+ Util.resetStateForExecModeSwitch();
TempScriptFile = File.createTempFile("temp_jira_851", ".pig");
FileWriter writer = new FileWriter(TempScriptFile);
for (String line : ScriptStatement) {
@@ -82,6 +82,7 @@ public class TestUDF {
Iterator<Tuple> iterator = pig.openIterator("B");
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
+ @SuppressWarnings("unchecked")
Map<Object, Object> result = (Map<Object, Object>) tuple.get(0);
assertEquals(result, MyUDFReturnMap.map);
}
@@ -92,7 +93,7 @@ public class TestUDF {
Util.createInputFile(cluster, "a.txt", new String[] { "dummy",
"dummy" });
FileLocalizer.deleteTempFiles();
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+ PigServer pig = new PigServer(cluster.getExecType(), cluster
.getProperties());
pig.registerQuery("A = LOAD 'a.txt';");
pig.registerQuery("B = FOREACH A GENERATE org.apache.pig.test.utils.MyUDFReturnMap();");
@@ -100,6 +101,7 @@ public class TestUDF {
Iterator<Tuple> iterator = pig.openIterator("B");
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
+ @SuppressWarnings("unchecked")
Map<Object, Object> result = (Map<Object, Object>) tuple.get(0);
assertEquals(result, MyUDFReturnMap.map);
}
@@ -107,7 +109,7 @@ public class TestUDF {
@Test
public void testUDFMultiLevelOutputSchema() throws Exception {
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pig = new PigServer(cluster.getExecType(), cluster.getProperties());
pig.registerQuery("A = LOAD 'a.txt';");
pig.registerQuery("B = FOREACH A GENERATE org.apache.pig.test.utils.MultiLevelDerivedUDF1();");
pig.registerQuery("C = FOREACH A GENERATE org.apache.pig.test.utils.MultiLevelDerivedUDF2();");
@@ -211,7 +213,7 @@ public class TestUDF {
pig.dumpSchema("c");
pig.dumpSchema("d");
}
-
+
@Test
public void testEvalFuncGetVarArgToFunc() throws Exception {
String input = "udf_test_jira_3444.txt";
@@ -254,6 +256,7 @@ public class TestUDF {
return schemaString;
}
+ @Override
public Schema outputSchema(Schema input) {
return schema;
}
@@ -287,7 +290,7 @@ public class TestUDF {
return l;
}
}
-
+
public static class UdfWithFuncSpecWithVarArgs extends EvalFunc<Integer> {
public UdfWithFuncSpecWithVarArgs() {}
@@ -302,7 +305,7 @@ public class TestUDF {
}
return res;
}
-
+
@Override
public SchemaType getSchemaType() {
return SchemaType.VARARG;
Modified: pig/branches/tez/test/org/apache/pig/test/TestUDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestUDFContext.java?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestUDFContext.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestUDFContext.java Thu May 8 20:50:06 2014
@@ -17,7 +17,9 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.io.FileWriter;
import java.util.Iterator;
@@ -29,24 +31,10 @@ import org.apache.pig.builtin.PigStorage
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.util.UDFContext;
-import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.Test;
public class TestUDFContext {
-
- static MiniCluster cluster = null;
-
- @Before
- public void setUp() throws Exception {
- cluster = MiniCluster.buildCluster();
- }
- @AfterClass
- public static void oneTimeTearDown() throws Exception {
- cluster.shutDown();
- }
-
@Test
public void testUDFContext() throws Exception {
File a = Util.createLocalInputFile("a.txt", new String[] { "dumb" });
@@ -67,7 +55,7 @@ public class TestUDFContext {
writer.write(line + "\n");
}
writer.close();
-
+
pig.registerScript(tmpFile.getAbsolutePath());
Iterator<Tuple> iterator = pig.openIterator("D");
while (iterator.hasNext()) {
@@ -81,26 +69,26 @@ public class TestUDFContext {
assertEquals(tuple.get(3).toString(), "five");
}
}
-
-
+
+
/**
- * Test that UDFContext is reset each time the plan is regenerated
+ * Test that UDFContext is reset each time the plan is regenerated
* @throws Exception
*/
@Test
public void testUDFContextReset() throws Exception {
PigServer pig = new PigServer(ExecType.LOCAL);
pig.registerQuery(" l = load 'file' as (a :int, b : int, c : int);");
- pig.registerQuery(" f = foreach l generate a, b;");
+ pig.registerQuery(" f = foreach l generate a, b;");
pig.explain("f", System.out);
Properties props = UDFContext.getUDFContext().getUDFProperties(PigStorage.class);
// required fields property should be set because f results does not
// require the third column c, so properties should not be null
assertTrue(
- "properties in udf context for load should not be empty: "+props,
+ "properties in udf context for load should not be empty: "+props,
props.keySet().size()>0);
-
+
// the new statement for alias f below will require all columns,
// so this time required fields property for loader should not be set
pig.registerQuery(" f = foreach l generate a, b, c;");
@@ -108,10 +96,10 @@ public class TestUDFContext {
props = UDFContext.getUDFContext().getUDFProperties(PigStorage.class);
assertTrue(
- "properties in udf context for load should be empty: "+props,
+ "properties in udf context for load should be empty: "+props,
props.keySet().size() == 0);
-
+
}
-
+
}
Modified: pig/branches/tez/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/Util.java?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/Util.java Thu May 8 20:50:06 2014
@@ -45,8 +45,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import junit.framework.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -105,6 +103,8 @@ import org.apache.pig.newplan.logical.vi
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.QueryParserDriver;
import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.junit.Assert;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
@@ -486,10 +486,8 @@ public class Util {
*/
static public void checkQueryOutputs(Iterator<Tuple> actualResults,
Tuple[] expectedResults) {
- for (Tuple expected : expectedResults) {
- Tuple actual = actualResults.next();
- Assert.assertEquals(expected.toString(), actual.toString());
- }
+ checkQueryOutputs(actualResults, Arrays.asList(expectedResults));
+
}
/**
@@ -501,8 +499,13 @@ public class Util {
*/
static public void checkQueryOutputs(Iterator<Tuple> actualResults,
List<Tuple> expectedResults) {
-
- checkQueryOutputs(actualResults,expectedResults.toArray(new Tuple[expectedResults.size()]));
+ int count = 0;
+ for (Tuple expected : expectedResults) {
+ Tuple actual = actualResults.next();
+ count++;
+ Assert.assertEquals(expected.toString(), actual.toString());
+ }
+ Assert.assertEquals(expectedResults.size(), count);
}
/**
@@ -1288,7 +1291,7 @@ public class Util {
}
/**
- *
+ *
* @param expected
* Exception class that is expected to be thrown
* @param found
@@ -1301,4 +1304,18 @@ public class Util {
assertEquals(expected, found.getClass());
assertEquals(found.getMessage(), message);
}
+
+ /**
+ * Called to reset ThreadLocal or static states that PigServer depends on
+ * when a test suite has testcases switching between LOCAL and MAPREDUCE/TEZ
+ * execution modes
+ */
+ public static void resetStateForExecModeSwitch() {
+ FileLocalizer.setInitialized(false);
+ // TODO: once we have Tez local mode, we can get rid of this. For now,
+ // if we run this test suite in Tez mode and there are some tests
+ // in LOCAL mode, we need to set ScriptState to
+ // null to force ScriptState gets initialized every time.
+ ScriptState.start(null);
+ }
}
Modified: pig/branches/tez/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/tez-tests?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/tez-tests (original)
+++ pig/branches/tez/test/tez-tests Thu May 8 20:50:06 2014
@@ -9,13 +9,28 @@
**/TestCompressedFiles.java
**/TestCustomPartitioner.java
**/TestEvalPipeline.java
+**/TestFilterUDF.java
+**/TestFinish.java
+**/TestForEachNestedPlan.java
+**/TestLoad.java
+**/TestLocalRearrange.java
+**/TestMapReduce.java
+**/TestMapReduce2.java
**/TestNestedForeach.java
**/TestPigContext.java
**/TestPigServer.java
**/TestPigStorage.java
-**/TestSecondarySortTez.java
+**/TestSample.java
+**/TestSchema.java
+**/TestScriptLanguageJavaScript.java
+**/TestScriptUDF.java
**/TestSkewedJoin.java
**/TestSplitStore.java
+**/TestStoreOld.java
+**/TestStreaming.java
+**/TestUDF.java
+**/TestUDFContext.java
+**/TestSecondarySortTez.java
**/TestTezCompiler.java
**/TestTezJobControlCompiler.java
-**/TestTezLauncher.java
+**/TestTezLauncher.java
\ No newline at end of file