You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/10/31 22:32:12 UTC
svn commit: r1404362 - in /pig/trunk: ./ test/org/apache/pig/test/
Author: daijy
Date: Wed Oct 31 21:32:12 2012
New Revision: 1404362
URL: http://svn.apache.org/viewvc?rev=1404362&view=rev
Log:
PIG-2798: pig streaming tests assume interpreters are auto-resolved (jgordon via daijy)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/test/org/apache/pig/test/TestGrunt.java
pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
pig/trunk/test/org/apache/pig/test/TestSample.java
pig/trunk/test/org/apache/pig/test/TestStreaming.java
pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1404362&r1=1404361&r2=1404362&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct 31 21:32:12 2012
@@ -523,9 +523,11 @@ Release 0.10.1 - Unreleased
BUG FIXES
+PIG-2798: pig streaming tests assume interpreters are auto-resolved (jgordon via daijy)
+
PIG-2795: Fix test cases that generate pig scripts with "load " + pathStr to encode "\" in the path (jgordon via daijy)
-PIG-2796: Local temporary paths are not always valid HDFS path names (jgordon via daijy)
+PIG-2796: Local temporary paths are not always valid HDFS path names (jgordon via daijy)
Release 0.10.0
Modified: pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=1404362&r1=1404361&r2=1404362&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGrunt.java Wed Oct 31 21:32:12 2012
@@ -1021,47 +1021,74 @@ public class TestGrunt {
try {
PigServer server = new PigServer(ExecType.MAPREDUCE,cluster.getProperties());
PigContext context = server.getPigContext();
+
+ String strQuote = "'";
+ String strRemoveFile = "rm";
+ String strRemoveDir = "rmdir";
+
+ if (Util.WINDOWS)
+ {
+ strQuote = "\"";
+ strRemoveFile = "del";
+ strRemoveDir = "rd";
+ }
String strCmd = "sh mkdir test_shell_tmp;";
+ // Create a temp directory via command and make sure it exists
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertTrue(new File("test_shell_tmp").exists());
- strCmd = "sh rmdir test_shell_tmp;";
+ // Remove the temp directory via shell and make sure it is gone
+ strCmd = "sh " + strRemoveDir + " test_shell_tmp;";
+
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertFalse(new File("test_shell_tmp").exists());
- strCmd = "sh bash -c 'echo hello world > tempShFileToTestShCommand'";
+ // Verify pipes are usable in the command context by piping data to a file
+ strCmd = "sh echo hello world > tempShFileToTestShCommand";
+
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
BufferedReader fileReader = null;
fileReader = new BufferedReader(new FileReader("tempShFileToTestShCommand"));
- assertTrue(fileReader.readLine().equalsIgnoreCase("hello world"));
+ assertTrue(fileReader.readLine().trim().equals("hello world"));
+
fileReader.close();
- strCmd = "sh rm tempShFileToTestShCommand";
+
+ // Remove the file via cmd and make sure it is gone
+ strCmd = "sh " + strRemoveFile + " tempShFileToTestShCommand";
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertFalse(new File("tempShFileToTestShCommand").exists());
- strCmd = "sh bash -c 'touch TouchedFileInsideGrunt_61 && ls | grep TouchedFileInsideGrunt_61 > fileContainingTouchedFileInsideGruntShell_71'";
+ if (Util.WINDOWS) {
+ strCmd = "sh echo foo > TouchedFileInsideGrunt_61 | dir /B | findstr TouchedFileInsideGrunt_61 > fileContainingTouchedFileInsideGruntShell_71";
+ }
+ else {
+ strCmd = "sh touch TouchedFileInsideGrunt_61 | ls | grep TouchedFileInsideGrunt_61 > fileContainingTouchedFileInsideGruntShell_71";
+ }
+
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
fileReader = new BufferedReader(new FileReader("fileContainingTouchedFileInsideGruntShell_71"));
- assertTrue(fileReader.readLine().equals("TouchedFileInsideGrunt_61"));
+ assertTrue(fileReader.readLine().trim().equals("TouchedFileInsideGrunt_61"));
+
fileReader.close();
- strCmd = "sh bash -c 'rm fileContainingTouchedFileInsideGruntShell_71'";
+ strCmd = "sh " + strRemoveFile+" fileContainingTouchedFileInsideGruntShell_71";
+
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1404362&r1=1404361&r2=1404362&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Wed Oct 31 21:32:12 2012
@@ -78,8 +78,8 @@ public class TestPruneColumn extends Tes
private static final String simpleEchoStreamingCommand;
static {
- if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
- simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+ if (Util.WINDOWS)
+ simpleEchoStreamingCommand = "perl -ne \"print ^\"$_^\"\"";
else
simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
}
@@ -266,7 +266,7 @@ public class TestPruneColumn extends Tes
@Test
public void testLoadForEach1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -289,7 +289,7 @@ public class TestPruneColumn extends Tes
@Test
public void testLoadForEach2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -312,7 +312,7 @@ public class TestPruneColumn extends Tes
@Test
public void testLoadForEach3() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -335,8 +335,8 @@ public class TestPruneColumn extends Tes
@Test
public void testJoin1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = join A by a1, B by b1;");
pigServer.registerQuery("D = foreach C generate a1, a2, b0, b1;");
@@ -357,8 +357,8 @@ public class TestPruneColumn extends Tes
@Test
public void testJoin2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = join A by a1, B by b1;");
pigServer.registerQuery("D = foreach C generate a1, a2, b1;");
@@ -379,7 +379,7 @@ public class TestPruneColumn extends Tes
@Test
public void testForEachFilter() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = filter A by a2==3;");
pigServer.registerQuery("C = foreach B generate a0, a1;");
@@ -398,7 +398,7 @@ public class TestPruneColumn extends Tes
@Test
public void testForEach1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1+a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -422,7 +422,7 @@ public class TestPruneColumn extends Tes
@Test
public void testForEach2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0 as b0, *;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -450,7 +450,7 @@ public class TestPruneColumn extends Tes
@Test
public void testSplit1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
pigServer.registerQuery("D = foreach B generate $1;");
@@ -468,7 +468,7 @@ public class TestPruneColumn extends Tes
@Test
public void testSplit2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
pigServer.registerQuery("D = foreach B generate $1;");
@@ -486,7 +486,7 @@ public class TestPruneColumn extends Tes
@Test
public void testForeachNoSchema1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -509,7 +509,7 @@ public class TestPruneColumn extends Tes
@Test
public void testForeachNoSchema2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate $1, 'aoeuaoeu';");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -532,8 +532,8 @@ public class TestPruneColumn extends Tes
@Test
public void testCoGroup1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by $1, B by $1;");
pigServer.registerQuery("D = foreach C generate AVG($1.$1);");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -563,7 +563,7 @@ public class TestPruneColumn extends Tes
@Test
public void testCoGroup2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = group A all;");
pigServer.registerQuery("C = foreach B generate $1;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -579,7 +579,7 @@ public class TestPruneColumn extends Tes
@Test
public void testCoGroup3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = group A by $1;");
pigServer.registerQuery("C = foreach B generate $1, '1';");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -605,8 +605,8 @@ public class TestPruneColumn extends Tes
@Test
public void testCoGroup4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate $1.$1, $2.$1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -639,7 +639,7 @@ public class TestPruneColumn extends Tes
@Test
public void testCoGroup5() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = group A by (a0, a1);");
pigServer.registerQuery("C = foreach B generate flatten(group);");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -665,7 +665,7 @@ public class TestPruneColumn extends Tes
@Test
public void testDistinct1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = distinct A;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -683,7 +683,7 @@ public class TestPruneColumn extends Tes
@Test
public void testStream1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = stream A through `" + simpleEchoStreamingCommand + "`;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -707,7 +707,7 @@ public class TestPruneColumn extends Tes
@Test
public void testBinCond1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile5.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
pigServer.registerQuery("B = foreach A generate ($1 == '2'? $2 : $3);");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -731,8 +731,8 @@ public class TestPruneColumn extends Tes
@Test
public void testCoGroup6() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate A, flatten(B.($0, $1));");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -760,8 +760,8 @@ public class TestPruneColumn extends Tes
@Test
public void testCoGroup7() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C {B = order B by $0;generate FLATTEN(A), B.($1);};");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -791,8 +791,8 @@ public class TestPruneColumn extends Tes
@Test
public void testCross1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cross A, B;");
pigServer.registerQuery("D = foreach C generate $0, $3;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -835,8 +835,8 @@ public class TestPruneColumn extends Tes
@Test
public void testUnion1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = union A, B;");
pigServer.registerQuery("D = foreach C generate $0, $2;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -876,8 +876,8 @@ public class TestPruneColumn extends Tes
@Test
public void testFRJoin1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = join A by $0, B by $0 using 'replicated';");
pigServer.registerQuery("D = foreach C generate $0, $3;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -904,7 +904,7 @@ public class TestPruneColumn extends Tes
@Test
public void testFilter1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by a1;");
pigServer.registerQuery("C = limit B 10;");
pigServer.registerQuery("D = foreach C generate $0;");
@@ -929,7 +929,7 @@ public class TestPruneColumn extends Tes
@Test
public void testFilter2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = filter A by a0+a2 == 4;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -953,7 +953,7 @@ public class TestPruneColumn extends Tes
@Test
public void testOrderBy1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by $0;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -977,7 +977,7 @@ public class TestPruneColumn extends Tes
@Test
public void testOrderBy2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1001,7 +1001,7 @@ public class TestPruneColumn extends Tes
@Test
public void testCogroup8() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = group A by *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1025,8 +1025,8 @@ public class TestPruneColumn extends Tes
@Test
public void testJoin3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = join A by *, B by * using 'replicated';");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -1050,7 +1050,7 @@ public class TestPruneColumn extends Tes
@Test
public void testLoadForEach4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1074,7 +1074,7 @@ public class TestPruneColumn extends Tes
@Test
public void testForEachUDF() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
pigServer.registerQuery("B = foreach A generate StringSize(*);");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1098,8 +1098,8 @@ public class TestPruneColumn extends Tes
@Test
public void testOutJoin1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile6.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile6.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
pigServer.registerQuery("C = join A by $0 left, B by $0;");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -1128,7 +1128,7 @@ public class TestPruneColumn extends Tes
@Test
public void testFilter3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = filter A by " + MyFilterFunc.class.getName() + "(*) ;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1152,7 +1152,7 @@ public class TestPruneColumn extends Tes
@Test
public void testMapKey1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a0, a1#'key1';");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1176,7 +1176,7 @@ public class TestPruneColumn extends Tes
@Test
public void testMapKey2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
pigServer.registerQuery("C = foreach B generate $0#'key2', $1;");
@@ -1202,7 +1202,7 @@ public class TestPruneColumn extends Tes
@Test
public void testMapKey3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
pigServer.registerQuery("C = group B all;");
@@ -1219,7 +1219,7 @@ public class TestPruneColumn extends Tes
@Test
public void testMapKey4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = limit A 10;");
pigServer.registerQuery("C = foreach B generate $0, $1#'key1';");
@@ -1244,7 +1244,7 @@ public class TestPruneColumn extends Tes
@Test
public void testMapKey5() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate $0, $1#'key1';");
pigServer.registerQuery("C = stream B through `" + simpleEchoStreamingCommand + "`;");
@@ -1269,7 +1269,7 @@ public class TestPruneColumn extends Tes
@Test
public void testMapKeyInSplit1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile12.toString()), pigServer.getPigContext()) + "' as (m:map[]);");
pigServer.registerQuery("B = foreach A generate m#'key1' as key1;");
pigServer.registerQuery("C = foreach A generate m#'key2' as key2;");
pigServer.registerQuery("D = join B by key1, C by key2;");
@@ -1290,7 +1290,7 @@ public class TestPruneColumn extends Tes
@SuppressWarnings("rawtypes")
@Test
public void testMapKeyInSplit2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile12.toString()), pigServer.getPigContext()) + "' as (m:map[]);");
pigServer.registerQuery("B = filter A by m#'cond'==1;");
pigServer.registerQuery("C = filter B by m#'key1'==1;");
pigServer.registerQuery("D = filter B by m#'key2'==2;");
@@ -1315,7 +1315,7 @@ public class TestPruneColumn extends Tes
@Test
public void testConstantPlan() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate 1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1339,7 +1339,7 @@ public class TestPruneColumn extends Tes
@Test
public void testPlainPlan() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = order A by $0;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1368,10 +1368,12 @@ public class TestPruneColumn extends Tes
// get a temp intermediate filename
File intermediateFile = File.createTempFile("intemediate", "txt");
intermediateFile.delete(); // delete since we don't want the file to be present
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
- pigServer.store("A", intermediateFile.toString(), "BinStorage()");
+ String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath());
- pigServer.registerQuery("A = load '"+ intermediateFile.toString()
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.store("A", clusterPath, "BinStorage()");
+
+ pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath)
+ "' using BinStorage() as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0;");
@@ -1395,13 +1397,14 @@ public class TestPruneColumn extends Tes
@Test
public void testBinStorage2() throws Exception {
- // get a temp intermediate filename
File intermediateFile = File.createTempFile("intemediate", "txt");
intermediateFile.delete(); // delete since we don't want the file to be present
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
- pigServer.store("A", intermediateFile.toString(), "BinStorage()");
+ String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath());
+
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.store("A", clusterPath, "BinStorage()");
- pigServer.registerQuery("A = load '"+ intermediateFile.toString()
+ pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath)
+ "' using BinStorage() as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a2, a0, a1;");
@@ -1424,13 +1427,12 @@ public class TestPruneColumn extends Tes
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
-
}
@Test
public void testProjectCastKeyLookup() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext())
+ "' as (a0, a1);");
pigServer.registerQuery("B = foreach A generate a1#'key1';");
@@ -1451,11 +1453,12 @@ public class TestPruneColumn extends Tes
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Map key required for A: $1->[key1]"}));
+
}
@Test
public void testRelayFlattenMap() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext())
+ "' as (a0, a1:map[]);");
pigServer.registerQuery("B = foreach A generate flatten(a1);");
@@ -1481,8 +1484,8 @@ public class TestPruneColumn extends Tes
@Test
public void testCrossAtLeastOneColumnOneInput() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = cross A, B;");
pigServer.registerQuery("D = foreach C generate $0;");
@@ -1519,8 +1522,8 @@ public class TestPruneColumn extends Tes
@Test
public void testComplex1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile7.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile8.toString(), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile7.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile8.toString()), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);");
pigServer.registerQuery("B1 = foreach B generate b2, b0+b3;");
pigServer.registerQuery("C = join A by $0, B1 by $0;");
pigServer.registerQuery("D = order C by $4;");
@@ -1544,8 +1547,8 @@ public class TestPruneColumn extends Tes
@Test
public void testCoGroup8() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate $0, $1;");
@@ -1577,7 +1580,7 @@ public class TestPruneColumn extends Tes
// See PIG-1128
@Test
public void testUserDefinedSchema() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
pigServer.registerQuery("B = foreach A generate c1 as c1 : chararray, c2 as c2 : int, 'CA' as state : chararray;");
pigServer.registerQuery("C = foreach B generate c1 as c1 : chararray;");
@@ -1599,7 +1602,7 @@ public class TestPruneColumn extends Tes
// See PIG-1127
@Test
public void testSharedSchemaObject() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile10.toString(), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile10.toString()), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
pigServer.registerQuery("B = foreach A generate a1;");
pigServer.registerQuery("C = limit B 10;");
@@ -1617,8 +1620,8 @@ public class TestPruneColumn extends Tes
// See PIG-1142
@Test
public void testJoin4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = join A by a2, B by b2;");
pigServer.registerQuery("D = foreach C generate $0, $1, $2;");
@@ -1642,7 +1645,7 @@ public class TestPruneColumn extends Tes
@Test
public void testFilter4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
pigServer.registerQuery("B = filter A by a2==3;");
pigServer.registerQuery("C = foreach B generate $2;");
@@ -1659,7 +1662,7 @@ public class TestPruneColumn extends Tes
@Test
public void testSplit3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
pigServer.registerQuery("split A into B if a2==3, C if a2<3;");
pigServer.registerQuery("C = foreach B generate $2;");
@@ -1676,7 +1679,7 @@ public class TestPruneColumn extends Tes
@Test
public void testOrderBy3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by a2;");
pigServer.registerQuery("C = foreach B generate a2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1700,9 +1703,9 @@ public class TestPruneColumn extends Tes
@Test
public void testCogroup9() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
- pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (c0, c1, c2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("C = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (c0, c1, c2);");
pigServer.registerQuery("D = cogroup A by a2, B by b2, C by c2;");
pigServer.registerQuery("E = foreach D generate $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("E");
@@ -1727,8 +1730,8 @@ public class TestPruneColumn extends Tes
// See PIG-1165
@Test
public void testOrderbyWrongSignature() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = order A by a1;");
pigServer.registerQuery("D = join C by a1, B by b0;");
pigServer.registerQuery("E = foreach D generate a1, b0, b1;");
@@ -1748,8 +1751,8 @@ public class TestPruneColumn extends Tes
// See PIG-1146
@Test
public void testUnionMixedPruning() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b2);");
pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;");
pigServer.registerQuery("D = union A, C;");
pigServer.registerQuery("E = foreach D generate $0, $2;");
@@ -1792,9 +1795,9 @@ public class TestPruneColumn extends Tes
// See PIG-1176
@Test
public void testUnionMixedSchemaPruning() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0;;");
- pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("C = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "';");
pigServer.registerQuery("D = foreach C generate $0;");
pigServer.registerQuery("E = union B, D;");
Iterator<Tuple> iter = pigServer.openIterator("E");
@@ -1838,7 +1841,7 @@ public class TestPruneColumn extends Tes
public void testForEachFlatten() throws Exception {
File inputFile = Util.createInputFile("table_testForEachFlatten", "", new String[]{"oiue\tM\t{(3),(4)}\t{(toronto),(montreal)}"});
- pigServer.registerQuery("A = load '"+inputFile.toString()+"' as (a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray)}, a3:bag{t:tuple(loc:chararray)});");
+ pigServer.registerQuery("A = load '"+Util.encodeEscape(inputFile.toString())+"' as (a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray)}, a3:bag{t:tuple(loc:chararray)});");
pigServer.registerQuery("B = foreach A generate a0, a1, flatten(a2), flatten(a3), 10;");
pigServer.registerQuery("C = foreach B generate a0, $4;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1867,7 +1870,7 @@ public class TestPruneColumn extends Tes
// See PIG-1210
@Test
public void testFieldsToReadDuplicatedEntry() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0+a0, a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1887,7 +1890,7 @@ public class TestPruneColumn extends Tes
// See PIG-1272
@Test
public void testSplit4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0;");
pigServer.registerQuery("C = join A by a0, B by a0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1905,7 +1908,7 @@ public class TestPruneColumn extends Tes
@Test
public void testSplit5() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile11.toString(), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile11.toString()), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1;");
pigServer.registerQuery("C = join A by a0, B by a0;");
pigServer.registerQuery("D = filter C by A::a1>=B::a1;");
@@ -1927,7 +1930,7 @@ public class TestPruneColumn extends Tes
// See PIG-1493
@Test
public void testInconsistentPruning() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);");
pigServer.registerQuery("B = foreach A generate CONCAT(a0,a1) as b0, a0, a2;");
pigServer.registerQuery("C = foreach B generate a0, a2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1949,12 +1952,12 @@ public class TestPruneColumn extends Tes
Path output1 = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
Path output2 = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
pigServer.setBatchOn();
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile5.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
pigServer.registerQuery("B = foreach A generate a0, a1, a2;");
- pigServer.registerQuery("store B into '" + Util.generateURI(output1.toString(), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("store B into '" + Util.generateURI(Util.encodeEscape(output1.toString()), pigServer.getPigContext()) + "';");
pigServer.registerQuery("C = order B by a2;");
pigServer.registerQuery("D = foreach C generate a2;");
- pigServer.registerQuery("store D into '" + Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("store D into '" + Util.generateURI(Util.encodeEscape(output2.toString()), pigServer.getPigContext()) + "';");
pigServer.executeBatch();
BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString(), pigServer.getPigContext().getProperties())));
@@ -2039,7 +2042,7 @@ public class TestPruneColumn extends Tes
}
public void testAliasInRequiredFieldList() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using "
+ pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' using "
+ PruneColumnEvalFunc.class.getName() +"() as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
Modified: pig/trunk/test/org/apache/pig/test/TestSample.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSample.java?rev=1404362&r1=1404361&r2=1404362&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSample.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSample.java Wed Oct 31 21:32:12 2012
@@ -47,12 +47,15 @@ public class TestSample {
pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
tmpFile = File.createTempFile( this.getClass().getName(), ".txt");
+ tmpFile.delete(); // we don't want the file, just the temp path
+
+ tmpfilepath = Util.removeColon(tmpFile.getCanonicalPath());
+
String input[] = new String[DATALEN];
for(int i = 0; i < DATALEN; i++) {
input[i] = Integer.toString(i);
}
-
- tmpfilepath = tmpFile.getCanonicalPath();
+
Util.createInputFile(cluster, tmpfilepath, input);
}
@@ -90,21 +93,21 @@ public class TestSample {
public void testSample_None()
throws Exception
{
- verify("myid = sample (load '"+ tmpfilepath + "') 0.0;", 0, 0);
+ verify("myid = sample (load '"+ Util.encodeEscape(tmpfilepath) + "') 0.0;", 0, 0);
}
@Test
public void testSample_All()
throws Exception
{
- verify("myid = sample (load '"+ tmpfilepath + "') 1.0;", DATALEN, DATALEN);
+ verify("myid = sample (load '"+ Util.encodeEscape(tmpfilepath) + "') 1.0;", DATALEN, DATALEN);
}
@Test
public void testSample_Some()
throws Exception
{
- verify("myid = sample (load '"+ tmpfilepath + "') 0.5;", DATALEN/3, DATALEN*2/3);
+ verify("myid = sample (load '"+ Util.encodeEscape(tmpfilepath) + "') 0.5;", DATALEN/3, DATALEN*2/3);
}
@Test
Modified: pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=1404362&r1=1404361&r2=1404362&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStreaming.java Wed Oct 31 21:32:12 2012
@@ -69,10 +69,12 @@ public class TestStreaming {
private static final String simpleEchoStreamingCommand;
static {
- if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
- simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
- else
- simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+ String quote = "'";
+ if (Util.WINDOWS) {
+ quote= "\"";
+ }
+
+ simpleEchoStreamingCommand = "perl -ne "+quote+"print $_"+quote;
}
private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException {
@@ -305,13 +307,13 @@ public class TestStreaming {
// Pig query to run
pigServer.registerQuery(
- "define CMD1 `" + command1.getName() + " foo` " +
+ "define CMD1 `perl " + command1.getName() + " foo` " +
"ship ('" + Util.encodeEscape(command1.toString()) + "') " +
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
"output(stdout using " + PigStreaming.class.getName() + "(',')) " +
"stderr();");
pigServer.registerQuery(
- "define CMD2 `" + command2.getName() + " bar` " +
+ "define CMD2 `perl " + command2.getName() + " bar` " +
"ship ('" + Util.encodeEscape(command2.toString()) + "') " +
"input('bar' using " + PigStreaming.class.getName() + "(',')) " +
"output(stdout using " + PigStreaming.class.getName() + "(',')) " +
@@ -375,13 +377,13 @@ public class TestStreaming {
pigServer.registerQuery("define PS " + PigStreaming.class.getName() + "(',');");
pigServer.registerQuery(
- "define CMD1 `" + command1.getName() + " foo` " +
+ "define CMD1 `perl " + command1.getName() + " foo` " +
"ship ('" + Util.encodeEscape(command1.toString()) + "') " +
"input('foo' using PS )" +
"output(stdout using PS ) " +
"stderr();");
pigServer.registerQuery(
- "define CMD2 `" + command2.getName() + " bar` " +
+ "define CMD2 `perl " + command2.getName() + " bar` " +
"ship ('" + Util.encodeEscape(command2.toString()) + "') " +
"input('bar' using PS ) " +
"output(stdout using PS ) " +
@@ -447,12 +449,12 @@ public class TestStreaming {
// Pig query to run
pigServer.registerQuery(
- "define CMD1 `script1.pl foo` " +
+ "define CMD1 `perl script1.pl foo` " +
"cache ('" + c1 + "#script1.pl') " +
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
"stderr();");
pigServer.registerQuery(
- "define CMD2 `script2.pl bar` " +
+ "define CMD2 `perl script2.pl bar` " +
"cache ('" + c2 + "#script2.pl') " +
"input('bar' using " + PigStreaming.class.getName() + "(',')) " +
"stderr();");
@@ -513,7 +515,7 @@ public class TestStreaming {
// Pig query to run
pigServer.registerQuery(
- "define CMD `" + command.getName() + " foo bar` " +
+ "define CMD `perl " + command.getName() + " foo bar` " +
"ship ('" + Util.encodeEscape(command.toString()) + "') " +
"output('foo' using " + PigStreaming.class.getName() + "(','), " +
"'bar' using " + PigStreaming.class.getName() + "(',')) " +
@@ -576,7 +578,7 @@ public class TestStreaming {
"define PS " + PigStreaming.class.getName() + "(',');");
pigServer.registerQuery(
- "define CMD `" + command.getName() + " foo bar` " +
+ "define CMD `perl " + command.getName() + " foo bar` " +
"ship ('" + Util.encodeEscape(command.toString()) + "') " +
"output('foo' using PS, " +
"'bar' using PS) " +
@@ -636,7 +638,7 @@ public class TestStreaming {
Util.toDataByteArrays(expectedSecondFields));
// Pig query to run
pigServer.registerQuery(
- "define CMD `" + command.getName() + " foo bar foobar` " +
+ "define CMD `perl " + command.getName() + " foo bar foobar` " +
"ship ('" + Util.encodeEscape(command.toString()) + "') " +
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
"output('bar', " +
@@ -769,7 +771,7 @@ public class TestStreaming {
};
File command1 = Util.createInputFile("script", "pl", script);
String query =
- "define CMD1 `" + command1.getName() + " foo` " +
+ "define CMD1 `perl " + command1.getName() + " foo` " +
"ship ('" + Util.encodeEscape(command1.toString()) + "') " +
"input('foo' using " + PigStreaming.class.getName() + "(',')) " +
"output(stdout using " + PigStreaming.class.getName() + "(',')) " +
Modified: pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java?rev=1404362&r1=1404361&r2=1404362&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java Wed Oct 31 21:32:12 2012
@@ -20,7 +20,6 @@ package org.apache.pig.test;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import junit.framework.TestCase;
@@ -42,10 +41,12 @@ public class TestStreamingLocal extends
private static final String simpleEchoStreamingCommand;
static {
- if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
- simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
- else
- simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+ String quote = "'";
+ if (Util.WINDOWS) {
+ quote= "\"";
+ }
+
+ simpleEchoStreamingCommand = "perl -ne "+quote+"print $_"+quote;
}
@Before
@@ -98,8 +99,9 @@ public class TestStreamingLocal extends
}
// Pig query to run
- pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
- PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("IP = load '" +
+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) +
+ "' using " + PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
simpleEchoStreamingCommand + "`;");
@@ -140,8 +142,9 @@ public class TestStreamingLocal extends
.toDataByteArrays(expectedSecondFields));
}
// Pig query to run
- pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
- PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("IP = load '" +
+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) +
+ "' using " + PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
if(withTypes[i] == true) {
pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
@@ -181,8 +184,9 @@ public class TestStreamingLocal extends
}
// Pig query to run
- pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
- PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("IP = load '" +
+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) +
+ "' using " + PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
@@ -229,8 +233,9 @@ public class TestStreamingLocal extends
//setupExpectedResults(expectedFirstFields, expectedSecondFields);
// Pig query to run
- pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
- PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("IP = load '" +
+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) +
+ "' using " + PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
simpleEchoStreamingCommand + "`;");
@@ -276,8 +281,9 @@ public class TestStreamingLocal extends
// Pig query to run
pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand +
" | " + simpleEchoStreamingCommand + "`;");
- pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
- PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("IP = load '" +
+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) +
+ "' using " + PigStorage.class.getName() + "(',');");
if(withTypes[i] == true) {
pigServer.registerQuery("OP = stream IP through CMD as (f0:chararray, f1:int);");
} else {
@@ -288,47 +294,40 @@ public class TestStreamingLocal extends
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
}
-
+
@Test
- // See PIG-2442
- public void testTwoStreamingMultiStore()
+ public void testJoinTwoStreamingRelations()
throws Exception {
- File input = File.createTempFile("tmp", "");
- input.delete();
- Util.createLocalInputFile(input.getAbsolutePath(), new String[] {"first", "second", "third"});
-
- File output1 = File.createTempFile("tmp", "");
- output1.delete();
-
- File output2 = File.createTempFile("tmp", "");
- output2.delete();
-
- pigServer.setBatchOn();
- pigServer.registerQuery("A = load '" + input.getAbsolutePath() + "';");
- pigServer.registerQuery("B1 = stream A through `cat`;");
- pigServer.registerQuery("B1 = foreach B1 generate $0;");
- pigServer.registerQuery("STORE B1 INTO '" + output1.getAbsolutePath() + "' USING PigStorage();");
- pigServer.registerQuery("B2 = STREAM B1 THROUGH `cat`;");
- pigServer.registerQuery("STORE B2 INTO '" + output2.getAbsolutePath() + "' USING PigStorage();");
-
- pigServer.executeBatch();
-
- List<Tuple> list = Util.readFile2TupleList(output1.getAbsolutePath() + File.separator +
- "part-m-00000", "\t");
- assertTrue(list.get(0).get(0).equals("first"));
- assertTrue(list.get(1).get(0).equals("second"));
- assertTrue(list.get(2).get(0).equals("third"));
-
- list = Util.readFile2TupleList(output2.getAbsolutePath() + File.separator +
- "part-m-00000", "\t");
- assertTrue(list.get(0).get(0).equals("first"));
- assertTrue(list.get(1).get(0).equals("second"));
- assertTrue(list.get(2).get(0).equals("third"));
- }
-
- @Test
- public void testJoinTwoStreamingRelations() {
-
+ ArrayList<String> list = new ArrayList<String>();
+ for (int i=0; i<10000; i++) {
+ list.add("A," + i);
+ }
+ File input = Util.createInputFile("tmp", "", list.toArray(new String[0]));
+
+ // Expected results
+ Tuple expected = TupleFactory.getInstance().newTuple(4);
+ expected.set(0, "A");
+ expected.set(1, 0);
+ expected.set(2, "A");
+ expected.set(3, 0);
+
+ pigServer.registerQuery("A = load '" +
+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) +
+ "' using " + PigStorage.class.getName() + "(',') as (a0, a1);");
+ pigServer.registerQuery("B = stream A through `head -1` as (a0, a1);");
+ pigServer.registerQuery("C = load '" +
+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) +
+ "' using " + PigStorage.class.getName() + "(',') as (a0, a1);");
+ pigServer.registerQuery("D = stream C through `head -1` as (a0, a1);");
+ pigServer.registerQuery("E = join B by a0, D by a0;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ int count = 0;
+ while (iter.hasNext()) {
+ Assert.assertEquals(expected.toString(), iter.next().toString());
+ count++;
+ }
+ Assert.assertTrue(count == 1);
}
@Test
@@ -361,8 +360,9 @@ public class TestStreamingLocal extends
// Pig query to run
pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand +
"` input(stdin);");
- pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
- PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("IP = load '" +
+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) +
+ "' using " + PigStorage.class.getName() + "(',');");
pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
if(withTypes[i] == true) {
pigServer.registerQuery("OP = stream FILTERED_DATA through `" +