You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC
svn commit: r1642132 [11/14] - in /pig/branches/spark: ./ bin/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/
contrib/piggybank/java/s...
Modified: pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java Thu Nov 27 12:49:54 2014
@@ -33,10 +33,13 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.Shell;
+import org.apache.pig.ExecType;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
import org.apache.pig.tools.parameters.ParseException;
import org.junit.Test;
@@ -1352,9 +1355,11 @@ public class TestParamSubPreproc {
File inputFile = Util.createFile(new String[]{"daniel\t10","jenny\t20"});
File outputFile = File.createTempFile("tmp", "");
outputFile.delete();
- String command = "a = load '" + Util.encodeEscape(inputFile.toString()) + "' as ($param1:chararray, $param2:int);\n"
- + "store a into '" + outputFile.toString() + "';\n"
+ PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+ String command = "a = load '" + Util.generateURI(inputFile.toString(), pc) + "' as ($param1:chararray, $param2:int);\n"
+ + "store a into '" + Util.generateURI(outputFile.toString(), pc) + "';\n"
+ "quit\n";
+ System.setProperty("jline.WindowsTerminal.directConsole", "false");
System.setIn(new ByteArrayInputStream(command.getBytes()));
org.apache.pig.PigRunner.run(new String[] {"-x", "local", "-p", "param1=name", "-p", "param2=age"}, null);
File[] partFiles = outputFile.listFiles(new FilenameFilter() {
@@ -1387,4 +1392,4 @@ public class TestParamSubPreproc {
}
return result;
}
-}
\ No newline at end of file
+}
Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Thu Nov 27 12:49:54 2014
@@ -45,23 +45,25 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.newplan.Operator;
+import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestPigRunner {
- private static MiniCluster cluster;
+ private static MiniGenericCluster cluster;
+ private static String execType;
private static final String INPUT_FILE = "input";
private static final String OUTPUT_FILE = "output";
@@ -69,7 +71,8 @@ public class TestPigRunner {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- cluster = MiniCluster.buildCluster();
+ cluster = MiniGenericCluster.buildCluster();
+ execType = cluster.getExecType().name().toLowerCase();
PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
w.println("1\t2\t3");
w.println("5\t3\t4");
@@ -89,6 +92,7 @@ public class TestPigRunner {
@Before
public void setUp() {
deleteAll(new File(OUTPUT_FILE));
+ Util.resetStateForExecModeSwitch();
}
@Test
@@ -154,8 +158,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -171,7 +175,7 @@ public class TestPigRunner {
Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties());
assertTrue(conf.getBoolean("stop.on.failure", false));
assertTrue(!conf.getBoolean("aggregate.warning", true));
- assertTrue(!conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true));
+ assertTrue(!conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true));
assertTrue(!conf.getBoolean("opt.fetch", true));
} finally {
new File(PIG_FILE).delete();
@@ -189,8 +193,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats instanceof EmptyPigStats);
assertTrue(stats.isSuccessful());
@@ -200,7 +204,7 @@ public class TestPigRunner {
Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties());
assertTrue(conf.getBoolean("stop.on.failure", false));
assertTrue(!conf.getBoolean("aggregate.warning", true));
- assertTrue(!conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true));
+ assertTrue(!conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true));
assertTrue(conf.getBoolean("opt.fetch", true));
} finally {
new File(PIG_FILE).delete();
@@ -220,8 +224,8 @@ public class TestPigRunner {
Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
try {
- String[] args = { inputInDfs.toString() };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, inputInDfs.toString() };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -248,11 +252,17 @@ public class TestPigRunner {
w.println("C = limit B 2;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
try {
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
- assertTrue(stats.getJobGraph().size() == 4);
+ if (execType.equals("tez")) {
+ assertEquals(stats.getJobGraph().size(), 1);
+ // 5 vertices
+ assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5);
+ } else {
+ assertEquals(stats.getJobGraph().size(), 4);
+ }
assertTrue(stats.getJobGraph().getSinks().size() == 1);
assertTrue(stats.getJobGraph().getSources().size() == 1);
JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);
@@ -263,11 +273,20 @@ public class TestPigRunner {
assertEquals(2, stats.getRecordWritten());
assertEquals(12, stats.getBytesWritten());
- assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
- 0)).getAlias());
- assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
- js).get(0)).getAlias());
- assertEquals("B", js.getAlias());
+ if (execType.equals("tez")) {
+ assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get(
+ 0)).getAlias());
+ // TODO: alias is not set for sample-aggregation/partition/sort job.
+ // Need to investigate
+ // assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+ // js).get(0)).getAlias());
+ } else {
+ assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
+ 0)).getAlias());
+ assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+ js).get(0)).getAlias());
+ assertEquals("B", js.getAlias());
+ }
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -287,8 +306,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
assertTrue(stats.getJobGraph().size() == 1);
// Each output file should include the following:
@@ -336,10 +355,11 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
- assertTrue(stats.getJobGraph().size() == 1);
+ assertEquals(stats.getJobGraph().size(), 1);
+
// Each output file should include the following:
// output:
// 5\t3\t4\n
@@ -383,15 +403,19 @@ public class TestPigRunner {
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, null);
Iterator<JobStats> iter = stats.getJobGraph().iterator();
while (iter.hasNext()) {
JobStats js=iter.next();
- if(js.getState().name().equals("FAILED")) {
- List<Operator> ops=stats.getJobGraph().getSuccessors(js);
- for(Operator op : ops ) {
- assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+ if (execType.equals("tez")) {
+ assertEquals(js.getState().name(), "FAILED");
+ } else {
+ if(js.getState().name().equals("FAILED")) {
+ List<Operator> ops=stats.getJobGraph().getSuccessors(js);
+ for(Operator op : ops ) {
+ assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+ }
}
}
}
@@ -410,7 +434,7 @@ public class TestPigRunner {
w.println("C = foreach B generate group, COUNT(A);");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
- String[] args = { "-c", PIG_FILE };
+ String[] args = { "-x", execType, "-c", PIG_FILE };
PigStats stats = PigRunner.run(args, null);
assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
// TODO: error message has changed. Need to catch the new message generated from the
@@ -422,22 +446,23 @@ public class TestPigRunner {
@Test
public void simpleNegativeTest2() throws Exception {
- String[] args = { "-c", "-e", "this is a test" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, "-c", "-e", "this is a test" };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
}
@Test
public void simpleNegativeTest3() throws Exception {
- String[] args = { "-c", "-y" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, "-c", "-y" };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
- assertEquals("Found unknown option (-y) at position 2",
+ assertEquals("Found unknown option (-y) at position 4",
stats.getErrorMessage());
}
@Test
- public void NagetiveTest() throws Exception {
+ public void streamNegativeTest() throws Exception {
+ Assume.assumeTrue("Skip this test for TEZ temporarily as it hangs", Util.isMapredExecType(cluster.getExecType()));
final String OUTPUT_FILE_2 = "output2";
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
@@ -451,21 +476,32 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, null);
assertTrue(!stats.isSuccessful());
- assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
- assertTrue(stats.getJobGraph().size() == 2);
- JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
- assertTrue(job.isSuccessful());
- job = (JobStats)stats.getJobGraph().getSinks().get(0);
- assertTrue(!job.isSuccessful());
- assertTrue(stats.getOutputStats().size() == 3);
- for (OutputStats output : stats.getOutputStats()) {
- if (output.getName().equals("ee")) {
+ if (execType.equals("tez")) {
+ assertTrue(stats.getReturnCode() == ReturnCode.FAILURE);
+ assertTrue(stats.getJobGraph().size() == 1);
+ JobStats job = (JobStats)stats.getJobGraph().getSinks().get(0);
+ assertTrue(!job.isSuccessful());
+ assertTrue(stats.getOutputStats().size() == 3);
+ for (OutputStats output : stats.getOutputStats()) {
assertTrue(!output.isSuccessful());
- } else {
- assertTrue(output.isSuccessful());
+ }
+ } else {
+ assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
+ assertTrue(stats.getJobGraph().size() == 2);
+ JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
+ assertTrue(job.isSuccessful());
+ job = (JobStats)stats.getJobGraph().getSinks().get(0);
+ assertTrue(!job.isSuccessful());
+ assertTrue(stats.getOutputStats().size() == 3);
+ for (OutputStats output : stats.getOutputStats()) {
+ if (output.getName().equals("ee")) {
+ assertTrue(!output.isSuccessful());
+ } else {
+ assertTrue(output.isSuccessful());
+ }
}
}
} finally {
@@ -519,8 +555,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -554,8 +590,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -586,8 +622,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -613,9 +649,9 @@ public class TestPigRunner {
String jarName = Util.findPigJarName();
String[] args = { "-Dpig.additional.jars=" + jarName,
- "-Dmapred.job.queue.name=default",
+ "-Dmapred.job.queue.name=default", "-x", execType,
"-e", "A = load '" + INPUT_FILE + "';store A into '" + OUTPUT_FILE + "';\n" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
Util.deleteFile(cluster, OUTPUT_FILE);
PigContext ctx = stats.getPigContext();
@@ -633,7 +669,7 @@ public class TestPigRunner {
@Test
public void classLoaderTest() throws Exception {
// Skip in hadoop 23 test, see PIG-2449
- if (Util.isHadoop23() || Util.isHadoop2_0())
+ if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
return;
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("register test/org/apache/pig/test/data/pigtestloader.jar");
@@ -642,8 +678,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
} finally {
new File(PIG_FILE).delete();
@@ -658,8 +694,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.IO_EXCEPTION);
@@ -670,7 +706,7 @@ public class TestPigRunner {
@Test // PIG-2006
public void testEmptyFile() throws IOException {
- File f1 = new File( PIG_FILE );
+ File f1 = new File(PIG_FILE);
FileWriter fw1 = new FileWriter(f1);
fw1.close();
@@ -698,7 +734,7 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, null);
assertTrue(!stats.isSuccessful());
@@ -721,7 +757,7 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, null);
assertTrue(!stats.isSuccessful());
@@ -751,8 +787,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -783,8 +819,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -815,17 +851,22 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = {"-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
List<InputStats> inputs = stats.getInputStats();
assertEquals(2, inputs.size());
- for (InputStats instats : inputs) {
- // the multi-input counters are disabled
- assertEquals(-1, instats.getNumberRecords());
+ if (execType.equals("tez")) {
+ assertEquals(5, inputs.get(0).getNumberRecords());
+ assertEquals(5, inputs.get(1).getNumberRecords());
+ } else {
+ for (InputStats instats : inputs) {
+ // the multi-input counters are disabled
+ assertEquals(-1, instats.getNumberRecords());
+ }
}
List<OutputStats> outputs = stats.getOutputStats();
@@ -853,27 +894,44 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
+
+ String TASK_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.TaskCounter" : MRPigStatsUtil.TASK_COUNTER_GROUP;
+ String FS_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.FileSystemCounter" : MRPigStatsUtil.FS_COUNTER_GROUP;
- Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
- assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
- assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
- assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
- assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
- assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
-
- // Skip for hadoop 20.203+, See PIG-2446
- if (Util.isHadoop203plus())
- return;
+ if (execType.equals("tez")) {
+ Counters counter= ((JobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
+ assertEquals(5, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+ "INPUT_RECORDS_PROCESSED").getValue());
+ assertEquals(2, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
+ assertEquals(7, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+ "OUTPUT_RECORDS").getValue());
+ assertEquals(20,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
+ assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+ } else {
+ Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
+ assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
+ assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
+ assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
+ assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
+ assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
+
+ // Skip for hadoop 20.203+, See PIG-2446
+ if (Util.isHadoop203plus())
+ return;
- assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+ assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+ }
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -892,17 +950,22 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(2, outputs.size());
- for (OutputStats outstats : outputs) {
- // the multi-output counters are disabled
- assertEquals(-1, outstats.getNumberRecords());
+ if (execType.equals("tez")) {
+ assertEquals(outputs.get(0).getNumberRecords(), 5);
+ assertEquals(outputs.get(1).getNumberRecords(), 2);
+ } else {
+ for (OutputStats outstats : outputs) {
+ // the multi-output counters are disabled
+ assertEquals(-1, outstats.getNumberRecords());
+ }
}
List<InputStats> inputs = stats.getInputStats();
@@ -937,8 +1000,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { "-F", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, "-F", PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(!stats.isSuccessful());
@@ -967,6 +1030,15 @@ public class TestPigRunner {
private static final int JobsSubmitted = 1;
private static final int JobStarted = 2;
private static final int JobFinished = 3;
+ private String execType;
+
+ public TestNotificationListener(String execType) {
+ this.execType = execType;
+ }
+
+ public TestNotificationListener() {
+ this.execType = "mr";
+ }
@Override
public void initialPlanNotification(String id, OperatorPlan<?> plan) {
Modified: pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java Thu Nov 27 12:49:54 2014
@@ -115,7 +115,7 @@ public class TestPigScriptParser {
// backslash - hence 4. In a pig script in a file, this would be
// www\\.xyz\\.com
"define minelogs org.apache.pig.test.RegexGroupCount('www\\\\.xyz\\\\.com/sports');" ,
- "A = load '" + Util.generateURI(Util.encodeEscape(f.getAbsolutePath()), ps.getPigContext()) + "' using PigStorage() as (source : chararray);" ,
+ "A = load '" + Util.generateURI(f.getAbsolutePath(), ps.getPigContext()) + "' using PigStorage() as (source : chararray);" ,
"B = foreach A generate minelogs(source) as sportslogs;" };
for (String line : queryLines) {
ps.registerQuery(line);
Modified: pig/branches/spark/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigServer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigServer.java Thu Nov 27 12:49:54 2014
@@ -867,7 +867,7 @@ public class TestPigServer {
assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
assertEquals("true", properties.getProperty("aggregate.warning"));
- assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
+ assertEquals("true", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
assertEquals("false", properties.getProperty("stop.on.failure"));
//Test with properties file
@@ -877,7 +877,7 @@ public class TestPigServer {
assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
assertEquals("true", properties.getProperty("aggregate.warning"));
- assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
+ assertEquals("true", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
assertEquals("false", properties.getProperty("stop.on.failure"));
PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
@@ -889,7 +889,7 @@ public class TestPigServer {
properties = PropertiesUtil.loadDefaultProperties();
assertEquals("false", properties.getProperty("aggregate.warning"));
- assertEquals("false", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
+ assertEquals("false", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
assertEquals("true", properties.getProperty("stop.on.failure"));
propertyFile.delete();
@@ -968,13 +968,13 @@ public class TestPigServer {
pigServer.setValidateEachStatement(true);
pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);");
- pigServer.registerQuery("store A into '" + tempDir + "/testGruntValidation1';");
+ pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation1';");
pigServer.registerQuery("B = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);");
- pigServer.registerQuery("store B into '" + tempDir + "/testGruntValidation2';"); // This should pass
+ pigServer.registerQuery("store B into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation2';"); // This should pass
boolean validationExceptionCaptured = false;
try {
// This should fail due to output validation
- pigServer.registerQuery("store A into '" + tempDir + "/testGruntValidation1';");
+ pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(),pigServer.getPigContext()) + "/testGruntValidation1';");
} catch (FrontendException e) {
validationExceptionCaptured = true;
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java Thu Nov 27 12:49:54 2014
@@ -34,13 +34,12 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
import org.junit.Assert;
import org.junit.Test;
/**
- * Ensure that jars marked as predeployed are not included in the generated
- * job jar.
+ * Ensure that jars marked as predeployed are not included in the generated
+ * job jar.
*/
public class TestPredeployedJar {
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@@ -53,44 +52,44 @@ public class TestPredeployedJar {
File logFile = File.createTempFile("log", "");
FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0);
logger.addAppender(appender);
-
+
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getConfiguration());
- pigServer.getPigContext().getProperties().put(PigConfiguration.OPT_FETCH, "false");
+ pigServer.getPigContext().getProperties().put(PigConfiguration.PIG_OPT_FETCH, "false");
String[] inputData = new String[] { "hello", "world" };
Util.createInputFile(cluster, "a.txt", inputData);
- String jacksonJar = JarManager.findContainingJar(org.codehaus.jackson.JsonParser.class);
+ String jodaTimeJar = JarManager.findContainingJar(org.joda.time.DateTime.class);
pigServer.registerQuery("a = load 'a.txt' as (line:chararray);");
Iterator<Tuple> it = pigServer.openIterator("a");
String content = FileUtils.readFileToString(logFile);
- Assert.assertTrue(content.contains(jacksonJar));
-
+ Assert.assertTrue(content.contains(jodaTimeJar));
+
logFile = File.createTempFile("log", "");
-
- // Now let's mark the jackson jar as predeployed.
- pigServer.getPigContext().markJarAsPredeployed(jacksonJar);
+
+ // Now let's mark the guava jar as predeployed.
+ pigServer.getPigContext().markJarAsPredeployed(jodaTimeJar);
it = pigServer.openIterator("a");
content = FileUtils.readFileToString(logFile);
- Assert.assertFalse(content.contains(jacksonJar));
+ Assert.assertFalse(content.contains(jodaTimeJar));
}
-
+
@Test
public void testPredeployedJarsProperty() throws ExecException {
Properties p = new Properties();
p.setProperty("pig.predeployed.jars", "zzz");
PigServer pigServer = new PigServer(ExecType.LOCAL, p);
-
+
Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("zzz"));
-
+
p = new Properties();
p.setProperty("pig.predeployed.jars", "aaa" + File.pathSeparator + "bbb");
pigServer = new PigServer(ExecType.LOCAL, p);
-
+
Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("aaa"));
Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("bbb"));
-
+
Assert.assertFalse(pigServer.getPigContext().predeployedJars.contains("zzz"));
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Thu Nov 27 12:49:54 2014
@@ -282,7 +282,7 @@ public class TestPruneColumn {
@Test
public void testLoadForEach1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -305,7 +305,7 @@ public class TestPruneColumn {
@Test
public void testLoadForEach2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -328,7 +328,7 @@ public class TestPruneColumn {
@Test
public void testLoadForEach3() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -351,8 +351,8 @@ public class TestPruneColumn {
@Test
public void testJoin1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = join A by a1, B by b1;");
pigServer.registerQuery("D = foreach C generate a1, a2, b0, b1;");
@@ -373,8 +373,8 @@ public class TestPruneColumn {
@Test
public void testJoin2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = join A by a1, B by b1;");
pigServer.registerQuery("D = foreach C generate a1, a2, b1;");
@@ -395,7 +395,7 @@ public class TestPruneColumn {
@Test
public void testForEachFilter() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = filter A by a2==3;");
pigServer.registerQuery("C = foreach B generate a0, a1;");
@@ -414,7 +414,7 @@ public class TestPruneColumn {
@Test
public void testForEach1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1+a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -438,7 +438,7 @@ public class TestPruneColumn {
@Test
public void testForEach2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0 as b0, *;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -466,7 +466,7 @@ public class TestPruneColumn {
@Test
public void testSplit1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
pigServer.registerQuery("D = foreach B generate $1;");
@@ -484,7 +484,7 @@ public class TestPruneColumn {
@Test
public void testSplit2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
pigServer.registerQuery("D = foreach B generate $1;");
@@ -502,7 +502,7 @@ public class TestPruneColumn {
@Test
public void testForeachNoSchema1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -525,7 +525,7 @@ public class TestPruneColumn {
@Test
public void testForeachNoSchema2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate $1, 'aoeuaoeu';");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -548,8 +548,8 @@ public class TestPruneColumn {
@Test
public void testCoGroup1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by $1, B by $1;");
pigServer.registerQuery("D = foreach C generate AVG($1.$1);");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -579,7 +579,7 @@ public class TestPruneColumn {
@Test
public void testCoGroup2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = group A all;");
pigServer.registerQuery("C = foreach B generate $1;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -595,7 +595,7 @@ public class TestPruneColumn {
@Test
public void testCoGroup3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = group A by $1;");
pigServer.registerQuery("C = foreach B generate $1, '1';");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -621,8 +621,8 @@ public class TestPruneColumn {
@Test
public void testCoGroup4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate $1.$1, $2.$1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -655,7 +655,7 @@ public class TestPruneColumn {
@Test
public void testCoGroup5() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = group A by (a0, a1);");
pigServer.registerQuery("C = foreach B generate flatten(group);");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -681,7 +681,7 @@ public class TestPruneColumn {
@Test
public void testDistinct1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = distinct A;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -699,7 +699,7 @@ public class TestPruneColumn {
@Test
public void testStream1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = stream A through `" + simpleEchoStreamingCommand + "`;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -723,7 +723,7 @@ public class TestPruneColumn {
@Test
public void testBinCond1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile5.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
pigServer.registerQuery("B = foreach A generate ($1 == '2'? $2 : $3);");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -747,8 +747,8 @@ public class TestPruneColumn {
@Test
public void testCoGroup6() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate A, flatten(B.($0, $1));");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -776,8 +776,8 @@ public class TestPruneColumn {
@Test
public void testCoGroup7() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C {B = order B by $0;generate FLATTEN(A), B.($1);};");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -807,8 +807,8 @@ public class TestPruneColumn {
@Test
public void testCross1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cross A, B;");
pigServer.registerQuery("D = foreach C generate $0, $3;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -851,8 +851,8 @@ public class TestPruneColumn {
@Test
public void testUnion1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = union A, B;");
pigServer.registerQuery("D = foreach C generate $0, $2;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -892,8 +892,8 @@ public class TestPruneColumn {
@Test
public void testFRJoin1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = join A by $0, B by $0 using 'replicated';");
pigServer.registerQuery("D = foreach C generate $0, $3;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -920,7 +920,7 @@ public class TestPruneColumn {
@Test
public void testFilter1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by a1;");
pigServer.registerQuery("C = limit B 10;");
pigServer.registerQuery("D = foreach C generate $0;");
@@ -945,7 +945,7 @@ public class TestPruneColumn {
@Test
public void testFilter2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = filter A by a0+a2 == 4;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -969,7 +969,7 @@ public class TestPruneColumn {
@Test
public void testOrderBy1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by $0;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -993,7 +993,7 @@ public class TestPruneColumn {
@Test
public void testOrderBy2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1017,7 +1017,7 @@ public class TestPruneColumn {
@Test
public void testCogroup8() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = group A by *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1041,8 +1041,8 @@ public class TestPruneColumn {
@Test
public void testJoin3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = join A by *, B by * using 'replicated';");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -1066,7 +1066,7 @@ public class TestPruneColumn {
@Test
public void testLoadForEach4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1090,7 +1090,7 @@ public class TestPruneColumn {
@Test
public void testForEachUDF() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
pigServer.registerQuery("B = foreach A generate StringSize(*);");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1114,8 +1114,8 @@ public class TestPruneColumn {
@Test
public void testOutJoin1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile6.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile6.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
pigServer.registerQuery("C = join A by $0 left, B by $0;");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -1144,7 +1144,7 @@ public class TestPruneColumn {
@Test
public void testFilter3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = filter A by " + MyFilterFunc.class.getName() + "(*) ;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1168,7 +1168,7 @@ public class TestPruneColumn {
@Test
public void testMapKey1() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a0, a1#'key1';");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1192,7 +1192,7 @@ public class TestPruneColumn {
@Test
public void testMapKey2() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
pigServer.registerQuery("C = foreach B generate $0#'key2', $1;");
@@ -1218,7 +1218,7 @@ public class TestPruneColumn {
@Test
public void testMapKey3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
pigServer.registerQuery("C = group B all;");
@@ -1235,7 +1235,7 @@ public class TestPruneColumn {
@Test
public void testMapKey4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = limit A 10;");
pigServer.registerQuery("C = foreach B generate $0, $1#'key1';");
@@ -1260,7 +1260,7 @@ public class TestPruneColumn {
@Test
public void testMapKey5() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate $0, $1#'key1';");
pigServer.registerQuery("C = stream B through `" + simpleEchoStreamingCommand + "`;");
@@ -1285,7 +1285,7 @@ public class TestPruneColumn {
@Test
public void testMapKeyInSplit1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile12.toString()), pigServer.getPigContext()) + "' as (m:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
pigServer.registerQuery("B = foreach A generate m#'key1' as key1;");
pigServer.registerQuery("C = foreach A generate m#'key2' as key2;");
pigServer.registerQuery("D = join B by key1, C by key2;");
@@ -1306,7 +1306,7 @@ public class TestPruneColumn {
@SuppressWarnings("rawtypes")
@Test
public void testMapKeyInSplit2() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile12.toString()), pigServer.getPigContext()) + "' as (m:map[]);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
pigServer.registerQuery("B = filter A by m#'cond'==1;");
pigServer.registerQuery("C = filter B by m#'key1'==1;");
pigServer.registerQuery("D = filter B by m#'key2'==2;");
@@ -1331,7 +1331,7 @@ public class TestPruneColumn {
@Test
public void testConstantPlan() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate 1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1355,7 +1355,7 @@ public class TestPruneColumn {
@Test
public void testPlainPlan() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = order A by $0;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1386,7 +1386,7 @@ public class TestPruneColumn {
intermediateFile.delete(); // delete since we don't want the file to be present
String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath());
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.store("A", clusterPath, "BinStorage()");
pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath)
@@ -1417,7 +1417,7 @@ public class TestPruneColumn {
intermediateFile.delete(); // delete since we don't want the file to be present
String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath());
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.store("A", clusterPath, "BinStorage()");
pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath)
@@ -1448,7 +1448,7 @@ public class TestPruneColumn {
@Test
public void testProjectCastKeyLookup() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext())
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ "' as (a0, a1);");
pigServer.registerQuery("B = foreach A generate a1#'key1';");
@@ -1474,7 +1474,7 @@ public class TestPruneColumn {
@Test
public void testRelayFlattenMap() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext())
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ "' as (a0, a1:map[]);");
pigServer.registerQuery("B = foreach A generate flatten(a1);");
@@ -1500,8 +1500,8 @@ public class TestPruneColumn {
@Test
public void testCrossAtLeastOneColumnOneInput() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = cross A, B;");
pigServer.registerQuery("D = foreach C generate $0;");
@@ -1538,8 +1538,8 @@ public class TestPruneColumn {
@Test
public void testComplex1() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile7.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile8.toString()), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile7.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile8.toString(), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);");
pigServer.registerQuery("B1 = foreach B generate b2, b0+b3;");
pigServer.registerQuery("C = join A by $0, B1 by $0;");
pigServer.registerQuery("D = order C by $4;");
@@ -1568,7 +1568,7 @@ public class TestPruneColumn {
pigServer.getPigContext().getProperties().setProperty(
PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
ObjectSerializer.serialize(optimizerRules));
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile13.toString()), pigServer.getPigContext()) + "' as (a:int, b:chararray);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile13.toString(), pigServer.getPigContext()) + "' as (a:int, b:chararray);");
pigServer.registerQuery("B = FOREACH A generate a;");
pigServer.registerQuery("C = GROUP B by a;");
pigServer.registerQuery("D = filter C by group > 0 and group < 100;");
@@ -1598,8 +1598,8 @@ public class TestPruneColumn {
@Test
public void testCoGroup8() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate $0, $1;");
@@ -1631,7 +1631,7 @@ public class TestPruneColumn {
// See PIG-1128
@Test
public void testUserDefinedSchema() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
pigServer.registerQuery("B = foreach A generate c1 as c1 : chararray, c2 as c2 : int, 'CA' as state : chararray;");
pigServer.registerQuery("C = foreach B generate c1 as c1 : chararray;");
@@ -1653,7 +1653,7 @@ public class TestPruneColumn {
// See PIG-1127
@Test
public void testSharedSchemaObject() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile10.toString()), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile10.toString(), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
pigServer.registerQuery("B = foreach A generate a1;");
pigServer.registerQuery("C = limit B 10;");
@@ -1671,8 +1671,8 @@ public class TestPruneColumn {
// See PIG-1142
@Test
public void testJoin4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = join A by a2, B by b2;");
pigServer.registerQuery("D = foreach C generate $0, $1, $2;");
@@ -1696,7 +1696,7 @@ public class TestPruneColumn {
@Test
public void testFilter4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
pigServer.registerQuery("B = filter A by a2==3;");
pigServer.registerQuery("C = foreach B generate $2;");
@@ -1713,7 +1713,7 @@ public class TestPruneColumn {
@Test
public void testSplit3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
pigServer.registerQuery("split A into B if a2==3, C if a2<3;");
pigServer.registerQuery("C = foreach B generate $2;");
@@ -1730,7 +1730,7 @@ public class TestPruneColumn {
@Test
public void testOrderBy3() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by a2;");
pigServer.registerQuery("C = foreach B generate a2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1754,9 +1754,9 @@ public class TestPruneColumn {
@Test
public void testCogroup9() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
- pigServer.registerQuery("C = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (c0, c1, c2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (c0, c1, c2);");
pigServer.registerQuery("D = cogroup A by a2, B by b2, C by c2;");
pigServer.registerQuery("E = foreach D generate $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("E");
@@ -1781,8 +1781,8 @@ public class TestPruneColumn {
// See PIG-1165
@Test
public void testOrderbyWrongSignature() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = order A by a1;");
pigServer.registerQuery("D = join C by a1, B by b0;");
pigServer.registerQuery("E = foreach D generate a1, b0, b1;");
@@ -1802,8 +1802,8 @@ public class TestPruneColumn {
// See PIG-1146
@Test
public void testUnionMixedPruning() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
- pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);");
pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;");
pigServer.registerQuery("D = union A, C;");
pigServer.registerQuery("E = foreach D generate $0, $2;");
@@ -1846,9 +1846,9 @@ public class TestPruneColumn {
// See PIG-1176
@Test
public void testUnionMixedSchemaPruning() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0;;");
- pigServer.registerQuery("C = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("D = foreach C generate $0;");
pigServer.registerQuery("E = union B, D;");
Iterator<Tuple> iter = pigServer.openIterator("E");
@@ -1921,7 +1921,7 @@ public class TestPruneColumn {
// See PIG-1210
@Test
public void testFieldsToReadDuplicatedEntry() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0+a0, a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1941,7 +1941,7 @@ public class TestPruneColumn {
// See PIG-1272
@Test
public void testSplit4() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0;");
pigServer.registerQuery("C = join A by a0, B by a0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1959,7 +1959,7 @@ public class TestPruneColumn {
@Test
public void testSplit5() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile11.toString()), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile11.toString(), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1;");
pigServer.registerQuery("C = join A by a0, B by a0;");
pigServer.registerQuery("D = filter C by A::a1>=B::a1;");
@@ -1981,7 +1981,7 @@ public class TestPruneColumn {
// See PIG-1493
@Test
public void testInconsistentPruning() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);");
pigServer.registerQuery("B = foreach A generate CONCAT(a0,a1) as b0, a0, a2;");
pigServer.registerQuery("C = foreach B generate a0, a2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -2003,12 +2003,12 @@ public class TestPruneColumn {
Path output1 = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
Path output2 = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
pigServer.setBatchOn();
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile5.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
pigServer.registerQuery("B = foreach A generate a0, a1, a2;");
- pigServer.registerQuery("store B into '" + Util.generateURI(Util.encodeEscape(output1.toString()), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("store B into '" + Util.generateURI(output1.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("C = order B by a2;");
pigServer.registerQuery("D = foreach C generate a2;");
- pigServer.registerQuery("store D into '" + Util.generateURI(Util.encodeEscape(output2.toString()), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("store D into '" + Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';");
pigServer.executeBatch();
BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString(), pigServer.getPigContext().getProperties())));
@@ -2093,7 +2093,7 @@ public class TestPruneColumn {
}
public void testAliasInRequiredFieldList() throws Exception{
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' using "
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using "
+ PruneColumnEvalFunc.class.getName() +"() as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -2110,7 +2110,7 @@ public class TestPruneColumn {
@Test
public void testCogroup10() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (a0, a1:double);");
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0, a1:double);");
pigServer.registerQuery("B = foreach A generate a0, a1, 0 as joinField;");
pigServer.registerQuery("C = group B all;");
pigServer.registerQuery("D = foreach C generate 0 as joinField, SUM(B.a1) as total;");
@@ -2179,4 +2179,4 @@ public class TestPruneColumn {
assertTrue(checkLogFileMessage(new String[]{"Map key required for event_serve: $0->[event_guid, filter_key, receive_time]",
"Map key required for raw: $0->[cm_serve_id, cm_serve_timestamp_ms, p_url, source, type]"}));
}
-}
\ No newline at end of file
+}
Modified: pig/branches/spark/test/org/apache/pig/test/TestRank3.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank3.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestRank3.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestRank3.java Thu Nov 27 12:49:54 2014
@@ -53,6 +53,12 @@ public class TestRank3 {
data = resetData(pigServer);
data.set("empty");
+ data.set("testsplit",
+ tuple(1, 2),
+ tuple(1, 2),
+ tuple(3, 1),
+ tuple(2, 4),
+ tuple(2, 3));
data.set(
"testcascade",
tuple(3,2,3),
@@ -156,6 +162,39 @@ public class TestRank3 {
verifyExpected(data.get("empty_result"), expected);
}
+ @Test
+ public void testRankWithSplitInMap() throws Exception {
+ String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);"
+ + "R2 = rank R1 by a ;"
+ + "R3 = rank R1 ;"
+ + "R4 = union R2, R3;"
+ + "store R4 into 'R4' using mock.Storage();";
+
+ Util.registerMultiLineQuery(pigServer, query);
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] { "(1L,1,2)",
+ "(2L,1,2)", "(3L,3,1)", "(4L,2,4)", "(5L,2,3)", "(1L,1,2)",
+ "(1L,1,2)", "(3L,2,3)", "(3L,2,4)", "(5L,3,1)" });
+ Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
+ }
+
+ @Test
+ public void testRankWithSplitInReduce() throws Exception {
+ String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);"
+ + "R1 = ORDER R1 by b;"
+ + "R2 = rank R1 by a ;"
+ + "R3 = rank R1;"
+ + "R4 = union R2, R3;"
+ + "store R4 into 'R4' using mock.Storage();";
+
+ Util.registerMultiLineQuery(pigServer, query);
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] { "(1L,3,1)",
+ "(2L,1,2)", "(3L,1,2)", "(4L,2,3)", "(5L,2,4)", "(1L,1,2)",
+ "(1L,1,2)", "(3L,2,4)", "(3L,2,3)", "(5L,3,1)" });
+ Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
+ }
+
public void verifyExpected(List<Tuple> out, Set<Tuple> expected) {
for (Tuple tup : out) {
assertTrue(expected + " contains " + tup, expected.contains(tup));