You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC

svn commit: r1642132 [11/14] - in /pig/branches/spark: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/ contrib/piggybank/java/s...

Modified: pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java Thu Nov 27 12:49:54 2014
@@ -33,10 +33,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.Shell;
+import org.apache.pig.ExecType;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.parameters.ParseException;
 import org.junit.Test;
@@ -1352,9 +1355,11 @@ public class TestParamSubPreproc {
         File inputFile = Util.createFile(new String[]{"daniel\t10","jenny\t20"});
         File outputFile = File.createTempFile("tmp", "");
         outputFile.delete();
-        String command = "a = load '" + Util.encodeEscape(inputFile.toString()) + "' as ($param1:chararray, $param2:int);\n"
-                + "store a into '" + outputFile.toString() + "';\n"
+        PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+        String command = "a = load '" + Util.generateURI(inputFile.toString(), pc)  + "' as ($param1:chararray, $param2:int);\n"
+                + "store a into '" + Util.generateURI(outputFile.toString(), pc) + "';\n"
                 + "quit\n";
+        System.setProperty("jline.WindowsTerminal.directConsole", "false");
         System.setIn(new ByteArrayInputStream(command.getBytes()));
         org.apache.pig.PigRunner.run(new String[] {"-x", "local", "-p", "param1=name", "-p", "param2=age"}, null);
         File[] partFiles = outputFile.listFiles(new FilenameFilter() {
@@ -1387,4 +1392,4 @@ public class TestParamSubPreproc {
          }
          return result;
      }
-}
\ No newline at end of file
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Thu Nov 27 12:49:54 2014
@@ -45,23 +45,25 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.newplan.Operator;
+import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestPigRunner {
 
-    private static MiniCluster cluster;
+    private static MiniGenericCluster cluster;
+    private static String execType;
 
     private static final String INPUT_FILE = "input";
     private static final String OUTPUT_FILE = "output";
@@ -69,7 +71,8 @@ public class TestPigRunner {
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
-        cluster = MiniCluster.buildCluster();
+        cluster = MiniGenericCluster.buildCluster();
+        execType = cluster.getExecType().name().toLowerCase();
         PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
         w.println("1\t2\t3");
         w.println("5\t3\t4");
@@ -89,6 +92,7 @@ public class TestPigRunner {
     @Before
     public void setUp() {
         deleteAll(new File(OUTPUT_FILE));
+        Util.resetStateForExecModeSwitch();
     }
 
     @Test
@@ -154,8 +158,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -171,7 +175,7 @@ public class TestPigRunner {
             Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties());
             assertTrue(conf.getBoolean("stop.on.failure", false));
             assertTrue(!conf.getBoolean("aggregate.warning", true));
-            assertTrue(!conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true));
+            assertTrue(!conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true));
             assertTrue(!conf.getBoolean("opt.fetch", true));
         } finally {
             new File(PIG_FILE).delete();
@@ -189,8 +193,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats instanceof EmptyPigStats);
             assertTrue(stats.isSuccessful());
@@ -200,7 +204,7 @@ public class TestPigRunner {
             Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties());
             assertTrue(conf.getBoolean("stop.on.failure", false));
             assertTrue(!conf.getBoolean("aggregate.warning", true));
-            assertTrue(!conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true));
+            assertTrue(!conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true));
             assertTrue(conf.getBoolean("opt.fetch", true));
         } finally {
             new File(PIG_FILE).delete();
@@ -220,8 +224,8 @@ public class TestPigRunner {
         Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
 
         try {
-            String[] args = { inputInDfs.toString() };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, inputInDfs.toString() };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -248,11 +252,17 @@ public class TestPigRunner {
         w.println("C = limit B 2;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        String[] args = { PIG_FILE };
+        String[] args = { "-x", execType, PIG_FILE };
         try {
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
-            assertTrue(stats.getJobGraph().size() == 4);
+            if (execType.equals("tez")) {
+                assertEquals(stats.getJobGraph().size(), 1);
+                // 5 vertices
+                assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5);
+            } else {
+                assertEquals(stats.getJobGraph().size(), 4);
+            }
             assertTrue(stats.getJobGraph().getSinks().size() == 1);
             assertTrue(stats.getJobGraph().getSources().size() == 1);
             JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);
@@ -263,11 +273,20 @@ public class TestPigRunner {
             assertEquals(2, stats.getRecordWritten());
             assertEquals(12, stats.getBytesWritten());
 
-            assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
-                    0)).getAlias());
-            assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
-                    js).get(0)).getAlias());
-            assertEquals("B", js.getAlias());
+            if (execType.equals("tez")) {
+                assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get(
+                        0)).getAlias());
+                // TODO: alias is not set for sample-aggregation/partition/sort job.
+                //       Need to investigate
+                // assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+                //        js).get(0)).getAlias());
+            } else {
+                assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
+                        0)).getAlias());
+                assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+                        js).get(0)).getAlias());
+                assertEquals("B", js.getAlias());
+            }
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -287,8 +306,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
             assertTrue(stats.getJobGraph().size() == 1);
             // Each output file should include the following:
@@ -336,10 +355,11 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
-            assertTrue(stats.getJobGraph().size() == 1);
+            assertEquals(stats.getJobGraph().size(), 1);
+
             // Each output file should include the following:
             // output:
             //   5\t3\t4\n
@@ -383,15 +403,19 @@ public class TestPigRunner {
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
             Iterator<JobStats> iter = stats.getJobGraph().iterator();
             while (iter.hasNext()) {
                  JobStats js=iter.next();
-                 if(js.getState().name().equals("FAILED")) {
-                     List<Operator> ops=stats.getJobGraph().getSuccessors(js);
-                     for(Operator op : ops ) {
-                         assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+                 if (execType.equals("tez")) {
+                     assertEquals(js.getState().name(), "FAILED");
+                 } else {
+                     if(js.getState().name().equals("FAILED")) {
+                         List<Operator> ops=stats.getJobGraph().getSuccessors(js);
+                         for(Operator op : ops ) {
+                             assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+                         }
                      }
                  }
             }
@@ -410,7 +434,7 @@ public class TestPigRunner {
         w.println("C = foreach B generate group, COUNT(A);");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        String[] args = { "-c", PIG_FILE };
+        String[] args = { "-x", execType, "-c", PIG_FILE };
         PigStats stats = PigRunner.run(args, null);
         assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
         // TODO: error message has changed. Need to catch the new message generated from the
@@ -422,22 +446,23 @@ public class TestPigRunner {
 
     @Test
     public void simpleNegativeTest2() throws Exception {
-        String[] args = { "-c", "-e", "this is a test" };
-        PigStats stats = PigRunner.run(args, new TestNotificationListener());
+        String[] args = { "-x", execType, "-c", "-e", "this is a test" };
+        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
         assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
     }
 
     @Test
     public void simpleNegativeTest3() throws Exception {
-        String[] args = { "-c", "-y" };
-        PigStats stats = PigRunner.run(args, new TestNotificationListener());
+        String[] args = { "-x", execType, "-c", "-y" };
+        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
         assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
-        assertEquals("Found unknown option (-y) at position 2",
+        assertEquals("Found unknown option (-y) at position 4",
                 stats.getErrorMessage());
     }
 
     @Test
-    public void NagetiveTest() throws Exception {
+    public void streamNegativeTest() throws Exception {
+        Assume.assumeTrue("Skip this test for TEZ temporarily as it hangs", Util.isMapredExecType(cluster.getExecType()));
         final String OUTPUT_FILE_2 = "output2";
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
@@ -451,21 +476,32 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
             assertTrue(!stats.isSuccessful());
-            assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
-            assertTrue(stats.getJobGraph().size() == 2);
-            JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
-            assertTrue(job.isSuccessful());
-            job = (JobStats)stats.getJobGraph().getSinks().get(0);
-            assertTrue(!job.isSuccessful());
-            assertTrue(stats.getOutputStats().size() == 3);
-            for (OutputStats output : stats.getOutputStats()) {
-                if (output.getName().equals("ee")) {
+            if (execType.equals("tez")) {
+                assertTrue(stats.getReturnCode() == ReturnCode.FAILURE);
+                assertTrue(stats.getJobGraph().size() == 1);
+                JobStats job = (JobStats)stats.getJobGraph().getSinks().get(0);
+                assertTrue(!job.isSuccessful());
+                assertTrue(stats.getOutputStats().size() == 3);
+                for (OutputStats output : stats.getOutputStats()) {
                     assertTrue(!output.isSuccessful());
-                } else {
-                    assertTrue(output.isSuccessful());
+                }
+            } else {
+                assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
+                assertTrue(stats.getJobGraph().size() == 2);
+                JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
+                assertTrue(job.isSuccessful());
+                job = (JobStats)stats.getJobGraph().getSinks().get(0);
+                assertTrue(!job.isSuccessful());
+                assertTrue(stats.getOutputStats().size() == 3);
+                for (OutputStats output : stats.getOutputStats()) {
+                    if (output.getName().equals("ee")) {
+                        assertTrue(!output.isSuccessful());
+                    } else {
+                        assertTrue(output.isSuccessful());
+                    }
                 }
             }
         } finally {
@@ -519,8 +555,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -554,8 +590,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -586,8 +622,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -613,9 +649,9 @@ public class TestPigRunner {
         String jarName = Util.findPigJarName();
 
         String[] args = { "-Dpig.additional.jars=" + jarName,
-                "-Dmapred.job.queue.name=default",
+                "-Dmapred.job.queue.name=default", "-x", execType,
                 "-e", "A = load '" + INPUT_FILE + "';store A into '" + OUTPUT_FILE + "';\n" };
-        PigStats stats = PigRunner.run(args, new TestNotificationListener());
+        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
         Util.deleteFile(cluster, OUTPUT_FILE);
         PigContext ctx = stats.getPigContext();
@@ -633,7 +669,7 @@ public class TestPigRunner {
     @Test
     public void classLoaderTest() throws Exception {
         // Skip in hadoop 23 test, see PIG-2449
-        if (Util.isHadoop23() || Util.isHadoop2_0())
+        if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
             return;
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("register test/org/apache/pig/test/data/pigtestloader.jar");
@@ -642,8 +678,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
         } finally {
             new File(PIG_FILE).delete();
@@ -658,8 +694,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(!stats.isSuccessful());
             assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.IO_EXCEPTION);
@@ -670,7 +706,7 @@ public class TestPigRunner {
 
     @Test // PIG-2006
     public void testEmptyFile() throws IOException {
-        File f1 = new File( PIG_FILE );
+        File f1 = new File(PIG_FILE);
 
         FileWriter fw1 = new FileWriter(f1);
         fw1.close();
@@ -698,7 +734,7 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
 
             assertTrue(!stats.isSuccessful());
@@ -721,7 +757,7 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
 
             assertTrue(!stats.isSuccessful());
@@ -751,8 +787,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -783,8 +819,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -815,17 +851,22 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = {"-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
             assertEquals(1, stats.getNumberJobs());
             List<InputStats> inputs = stats.getInputStats();
             assertEquals(2, inputs.size());
-            for (InputStats instats : inputs) {
-                // the multi-input counters are disabled
-                assertEquals(-1, instats.getNumberRecords());
+            if (execType.equals("tez")) {
+                assertEquals(5, inputs.get(0).getNumberRecords());
+                assertEquals(5, inputs.get(1).getNumberRecords());
+            } else {
+                for (InputStats instats : inputs) {
+                    // the multi-input counters are disabled
+                    assertEquals(-1, instats.getNumberRecords());
+                }
             }
 
             List<OutputStats> outputs = stats.getOutputStats();
@@ -853,27 +894,44 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
+
+            String TASK_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.TaskCounter" : MRPigStatsUtil.TASK_COUNTER_GROUP;
+            String FS_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.FileSystemCounter" : MRPigStatsUtil.FS_COUNTER_GROUP;
 
-            Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
-            assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
-            assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
-            assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
-            assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
-            assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
-
-            // Skip for hadoop 20.203+, See PIG-2446
-            if (Util.isHadoop203plus())
-                return;
+            if (execType.equals("tez")) {
+                Counters counter= ((JobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
+                assertEquals(5, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+                        "INPUT_RECORDS_PROCESSED").getValue());
+                assertEquals(2, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
+                assertEquals(7, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+                        "OUTPUT_RECORDS").getValue());
+                assertEquals(20,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
+                assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+            } else {
+                Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
+                assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
+                assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
+                assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
+                assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
+                assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
+
+                // Skip for hadoop 20.203+, See PIG-2446
+                if (Util.isHadoop203plus())
+                    return;
 
-            assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+                assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+            }
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -892,17 +950,22 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
             assertEquals(1, stats.getNumberJobs());
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
-            for (OutputStats outstats : outputs) {
-                // the multi-output counters are disabled
-                assertEquals(-1, outstats.getNumberRecords());
+            if (execType.equals("tez")) {
+                assertEquals(outputs.get(0).getNumberRecords(), 5);
+                assertEquals(outputs.get(1).getNumberRecords(), 2);
+            } else {
+                for (OutputStats outstats : outputs) {
+                    // the multi-output counters are disabled
+                    assertEquals(-1, outstats.getNumberRecords());
+                }
             }
 
             List<InputStats> inputs = stats.getInputStats();
@@ -937,8 +1000,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { "-F", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, "-F", PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(!stats.isSuccessful());
 
@@ -967,6 +1030,15 @@ public class TestPigRunner {
         private static final int JobsSubmitted = 1;
         private static final int JobStarted = 2;
         private static final int JobFinished = 3;
+        private String execType;
+
+        public TestNotificationListener(String execType) {
+            this.execType = execType;
+        }
+
+        public TestNotificationListener() {
+            this.execType = "mr";
+        }
 
         @Override
         public void initialPlanNotification(String id, OperatorPlan<?> plan) {

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java Thu Nov 27 12:49:54 2014
@@ -115,7 +115,7 @@ public class TestPigScriptParser {
                 // backslash - hence 4. In a pig script in a file, this would be
                 // www\\.xyz\\.com
                 "define minelogs org.apache.pig.test.RegexGroupCount('www\\\\.xyz\\\\.com/sports');" ,
-                "A = load '" + Util.generateURI(Util.encodeEscape(f.getAbsolutePath()), ps.getPigContext()) + "'  using PigStorage() as (source : chararray);" ,
+                "A = load '" + Util.generateURI(f.getAbsolutePath(), ps.getPigContext()) + "'  using PigStorage() as (source : chararray);" ,
                 "B = foreach A generate minelogs(source) as sportslogs;" };
         for (String line : queryLines) {
             ps.registerQuery(line);

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigServer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigServer.java Thu Nov 27 12:49:54 2014
@@ -867,7 +867,7 @@ public class TestPigServer {
 
         assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
         assertEquals("true", properties.getProperty("aggregate.warning"));
-        assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
+        assertEquals("true", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
         assertEquals("false", properties.getProperty("stop.on.failure"));
 
         //Test with properties file
@@ -877,7 +877,7 @@ public class TestPigServer {
 
         assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
         assertEquals("true", properties.getProperty("aggregate.warning"));
-        assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
+        assertEquals("true", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
         assertEquals("false", properties.getProperty("stop.on.failure"));
 
         PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
@@ -889,7 +889,7 @@ public class TestPigServer {
 
         properties = PropertiesUtil.loadDefaultProperties();
         assertEquals("false", properties.getProperty("aggregate.warning"));
-        assertEquals("false", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
+        assertEquals("false", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
         assertEquals("true", properties.getProperty("stop.on.failure"));
 
         propertyFile.delete();
@@ -968,13 +968,13 @@ public class TestPigServer {
 
         pigServer.setValidateEachStatement(true);
         pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);");
-        pigServer.registerQuery("store A into '" + tempDir + "/testGruntValidation1';");
+        pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation1';");
         pigServer.registerQuery("B = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);");
-        pigServer.registerQuery("store B into '" + tempDir + "/testGruntValidation2';"); // This should pass
+        pigServer.registerQuery("store B into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation2';"); // This should pass
         boolean validationExceptionCaptured = false;
         try {
             // This should fail due to output validation
-            pigServer.registerQuery("store A into '" + tempDir + "/testGruntValidation1';");
+            pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(),pigServer.getPigContext()) + "/testGruntValidation1';");
         } catch (FrontendException e) {
             validationExceptionCaptured = true;
         }

Modified: pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java Thu Nov 27 12:49:54 2014
@@ -34,13 +34,12 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
 import org.junit.Assert;
 import org.junit.Test;
 
 /**
- * Ensure that jars marked as predeployed are not included in the generated 
- * job jar. 
+ * Ensure that jars marked as predeployed are not included in the generated
+ * job jar.
  */
 public class TestPredeployedJar {
     static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@@ -53,44 +52,44 @@ public class TestPredeployedJar {
         File logFile = File.createTempFile("log", "");
         FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0);
         logger.addAppender(appender);
-        
+
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getConfiguration());
-        pigServer.getPigContext().getProperties().put(PigConfiguration.OPT_FETCH, "false");
+        pigServer.getPigContext().getProperties().put(PigConfiguration.PIG_OPT_FETCH, "false");
         String[] inputData = new String[] { "hello", "world" };
         Util.createInputFile(cluster, "a.txt", inputData);
-        String jacksonJar = JarManager.findContainingJar(org.codehaus.jackson.JsonParser.class);
+        String jodaTimeJar = JarManager.findContainingJar(org.joda.time.DateTime.class);
 
         pigServer.registerQuery("a = load 'a.txt' as (line:chararray);");
         Iterator<Tuple> it = pigServer.openIterator("a");
 
         String content = FileUtils.readFileToString(logFile);
-        Assert.assertTrue(content.contains(jacksonJar));
-        
+        Assert.assertTrue(content.contains(jodaTimeJar));
+
         logFile = File.createTempFile("log", "");
-        
-        // Now let's mark the jackson jar as predeployed.
-        pigServer.getPigContext().markJarAsPredeployed(jacksonJar);
+
+        // Now let's mark the guava jar as predeployed.
+        pigServer.getPigContext().markJarAsPredeployed(jodaTimeJar);
         it = pigServer.openIterator("a");
 
         content = FileUtils.readFileToString(logFile);
-        Assert.assertFalse(content.contains(jacksonJar));
+        Assert.assertFalse(content.contains(jodaTimeJar));
     }
-    
+
     @Test
     public void testPredeployedJarsProperty() throws ExecException {
         Properties p = new Properties();
         p.setProperty("pig.predeployed.jars", "zzz");
         PigServer pigServer = new PigServer(ExecType.LOCAL, p);
-        
+
         Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("zzz"));
-        
+
         p = new Properties();
         p.setProperty("pig.predeployed.jars", "aaa" + File.pathSeparator + "bbb");
         pigServer = new PigServer(ExecType.LOCAL, p);
-        
+
         Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("aaa"));
         Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("bbb"));
-        
+
         Assert.assertFalse(pigServer.getPigContext().predeployedJars.contains("zzz"));
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Thu Nov 27 12:49:54 2014
@@ -282,7 +282,7 @@ public class TestPruneColumn {
 
     @Test
     public void testLoadForEach1() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
         pigServer.registerQuery("B = foreach A generate a1, a2;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
 
@@ -305,7 +305,7 @@ public class TestPruneColumn {
 
     @Test
     public void testLoadForEach2() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
         pigServer.registerQuery("B = foreach A generate a0, a2;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
 
@@ -328,7 +328,7 @@ public class TestPruneColumn {
 
     @Test
     public void testLoadForEach3() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
         pigServer.registerQuery("B = foreach A generate a0, a1;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
 
@@ -351,8 +351,8 @@ public class TestPruneColumn {
 
     @Test
     public void testJoin1() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
         pigServer.registerQuery("C = join A by a1, B by b1;");
         pigServer.registerQuery("D = foreach C generate a1, a2, b0, b1;");
 
@@ -373,8 +373,8 @@ public class TestPruneColumn {
 
     @Test
     public void testJoin2() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
         pigServer.registerQuery("C = join A by a1, B by b1;");
         pigServer.registerQuery("D = foreach C generate a1, a2, b1;");
 
@@ -395,7 +395,7 @@ public class TestPruneColumn {
 
     @Test
     public void testForEachFilter() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
         pigServer.registerQuery("B = filter A by a2==3;");
         pigServer.registerQuery("C = foreach B generate a0, a1;");
 
@@ -414,7 +414,7 @@ public class TestPruneColumn {
 
     @Test
     public void testForEach1() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
         pigServer.registerQuery("B = foreach A generate a0, a1+a2;");
 
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -438,7 +438,7 @@ public class TestPruneColumn {
 
     @Test
     public void testForEach2() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
         pigServer.registerQuery("B = foreach A generate a0 as b0, *;");
 
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -466,7 +466,7 @@ public class TestPruneColumn {
 
     @Test
     public void testSplit1() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
         pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
         pigServer.registerQuery("D = foreach B generate $1;");
 
@@ -484,7 +484,7 @@ public class TestPruneColumn {
 
     @Test
     public void testSplit2() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
         pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
         pigServer.registerQuery("D = foreach B generate $1;");
 
@@ -502,7 +502,7 @@ public class TestPruneColumn {
 
     @Test
     public void testForeachNoSchema1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
         pigServer.registerQuery("B = foreach A generate $1, $2;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
 
@@ -525,7 +525,7 @@ public class TestPruneColumn {
 
     @Test
     public void testForeachNoSchema2() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
         pigServer.registerQuery("B = foreach A generate $1, 'aoeuaoeu';");
         Iterator<Tuple> iter = pigServer.openIterator("B");
 
@@ -548,8 +548,8 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
         pigServer.registerQuery("C = cogroup A by $1, B by $1;");
         pigServer.registerQuery("D = foreach C generate AVG($1.$1);");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -579,7 +579,7 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup2() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
         pigServer.registerQuery("B = group A all;");
         pigServer.registerQuery("C = foreach B generate $1;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -595,7 +595,7 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup3() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
         pigServer.registerQuery("B = group A by $1;");
         pigServer.registerQuery("C = foreach B generate $1, '1';");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -621,8 +621,8 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup4() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
         pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
         pigServer.registerQuery("D = foreach C generate $1.$1, $2.$1;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -655,7 +655,7 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup5() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = group A by (a0, a1);");
         pigServer.registerQuery("C = foreach B generate flatten(group);");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -681,7 +681,7 @@ public class TestPruneColumn {
 
     @Test
     public void testDistinct1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = distinct A;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -699,7 +699,7 @@ public class TestPruneColumn {
 
     @Test
     public void testStream1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = stream A through `" + simpleEchoStreamingCommand + "`;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -723,7 +723,7 @@ public class TestPruneColumn {
 
     @Test
     public void testBinCond1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile5.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
         pigServer.registerQuery("B = foreach A generate ($1 == '2'? $2 : $3);");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -747,8 +747,8 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup6() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
         pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
         pigServer.registerQuery("D = foreach C generate A, flatten(B.($0, $1));");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -776,8 +776,8 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup7() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
         pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
         pigServer.registerQuery("D = foreach C {B = order B by $0;generate FLATTEN(A), B.($1);};");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -807,8 +807,8 @@ public class TestPruneColumn {
 
     @Test
     public void testCross1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
         pigServer.registerQuery("C = cross A, B;");
         pigServer.registerQuery("D = foreach C generate $0, $3;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -851,8 +851,8 @@ public class TestPruneColumn {
 
     @Test
     public void testUnion1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
         pigServer.registerQuery("C = union A, B;");
         pigServer.registerQuery("D = foreach C generate $0, $2;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -892,8 +892,8 @@ public class TestPruneColumn {
 
     @Test
     public void testFRJoin1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
         pigServer.registerQuery("C = join A by $0, B by $0 using 'replicated';");
         pigServer.registerQuery("D = foreach C generate $0, $3;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -920,7 +920,7 @@ public class TestPruneColumn {
 
     @Test
     public void testFilter1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = order A by a1;");
         pigServer.registerQuery("C = limit B 10;");
         pigServer.registerQuery("D = foreach C generate $0;");
@@ -945,7 +945,7 @@ public class TestPruneColumn {
 
     @Test
     public void testFilter2() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = filter A by a0+a2 == 4;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -969,7 +969,7 @@ public class TestPruneColumn {
 
     @Test
     public void testOrderBy1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = order A by $0;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -993,7 +993,7 @@ public class TestPruneColumn {
 
     @Test
     public void testOrderBy2() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = order A by *;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1017,7 +1017,7 @@ public class TestPruneColumn {
 
     @Test
     public void testCogroup8() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = group A by *;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1041,8 +1041,8 @@ public class TestPruneColumn {
 
     @Test
     public void testJoin3() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
         pigServer.registerQuery("C = join A by *, B by * using 'replicated';");
         pigServer.registerQuery("D = foreach C generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -1066,7 +1066,7 @@ public class TestPruneColumn {
 
     @Test
     public void testLoadForEach4() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = foreach A generate *;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1090,7 +1090,7 @@ public class TestPruneColumn {
 
     @Test
     public void testForEachUDF() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
         pigServer.registerQuery("B = foreach A generate StringSize(*);");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1114,8 +1114,8 @@ public class TestPruneColumn {
 
     @Test
     public void testOutJoin1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile6.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile6.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
         pigServer.registerQuery("C = join A by $0 left, B by $0;");
         pigServer.registerQuery("D = foreach C generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -1144,7 +1144,7 @@ public class TestPruneColumn {
 
     @Test
     public void testFilter3() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = filter A by " + MyFilterFunc.class.getName() + "(*) ;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1168,7 +1168,7 @@ public class TestPruneColumn {
 
     @Test
     public void testMapKey1() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
         pigServer.registerQuery("B = foreach A generate a0, a1#'key1';");
 
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1192,7 +1192,7 @@ public class TestPruneColumn {
 
     @Test
     public void testMapKey2() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
         pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
         pigServer.registerQuery("C = foreach B generate $0#'key2', $1;");
 
@@ -1218,7 +1218,7 @@ public class TestPruneColumn {
 
     @Test
     public void testMapKey3() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
         pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
         pigServer.registerQuery("C = group B all;");
 
@@ -1235,7 +1235,7 @@ public class TestPruneColumn {
 
     @Test
     public void testMapKey4() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
         pigServer.registerQuery("B = limit A 10;");
         pigServer.registerQuery("C = foreach B generate $0, $1#'key1';");
 
@@ -1260,7 +1260,7 @@ public class TestPruneColumn {
 
     @Test
     public void testMapKey5() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
         pigServer.registerQuery("B = foreach A generate $0, $1#'key1';");
         pigServer.registerQuery("C = stream B through `" + simpleEchoStreamingCommand + "`;");
 
@@ -1285,7 +1285,7 @@ public class TestPruneColumn {
 
     @Test
     public void testMapKeyInSplit1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile12.toString()), pigServer.getPigContext()) + "' as (m:map[]);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
         pigServer.registerQuery("B = foreach A generate m#'key1' as key1;");
         pigServer.registerQuery("C = foreach A generate m#'key2' as key2;");
         pigServer.registerQuery("D = join B by key1, C by key2;");
@@ -1306,7 +1306,7 @@ public class TestPruneColumn {
     @SuppressWarnings("rawtypes")
     @Test
     public void testMapKeyInSplit2() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile12.toString()), pigServer.getPigContext()) + "' as (m:map[]);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
         pigServer.registerQuery("B = filter A by m#'cond'==1;");
         pigServer.registerQuery("C = filter B by m#'key1'==1;");
         pigServer.registerQuery("D = filter B by m#'key2'==2;");
@@ -1331,7 +1331,7 @@ public class TestPruneColumn {
 
     @Test
     public void testConstantPlan() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
         pigServer.registerQuery("B = foreach A generate 1, a2;");
 
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1355,7 +1355,7 @@ public class TestPruneColumn {
 
     @Test
     public void testPlainPlan() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
         pigServer.registerQuery("B = order A by $0;");
 
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -1386,7 +1386,7 @@ public class TestPruneColumn {
         intermediateFile.delete(); // delete since we don't want the file to be present
         String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath());
 
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
         pigServer.store("A", clusterPath, "BinStorage()");
 
         pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath)
@@ -1417,7 +1417,7 @@ public class TestPruneColumn {
         intermediateFile.delete(); // delete since we don't want the file to be present
         String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath());
 
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
         pigServer.store("A", clusterPath, "BinStorage()");
 
         pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath)
@@ -1448,7 +1448,7 @@ public class TestPruneColumn {
 
     @Test
     public void testProjectCastKeyLookup() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext())
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
                 + "' as (a0, a1);");
 
         pigServer.registerQuery("B = foreach A generate a1#'key1';");
@@ -1474,7 +1474,7 @@ public class TestPruneColumn {
 
     @Test
     public void testRelayFlattenMap() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext())
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
                 + "' as (a0, a1:map[]);");
 
         pigServer.registerQuery("B = foreach A generate flatten(a1);");
@@ -1500,8 +1500,8 @@ public class TestPruneColumn {
 
     @Test
     public void testCrossAtLeastOneColumnOneInput() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
         pigServer.registerQuery("C = cross A, B;");
         pigServer.registerQuery("D = foreach C generate $0;");
 
@@ -1538,8 +1538,8 @@ public class TestPruneColumn {
 
     @Test
     public void testComplex1() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile7.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile8.toString()), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile7.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile8.toString(), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);");
         pigServer.registerQuery("B1 = foreach B generate b2, b0+b3;");
         pigServer.registerQuery("C = join A by $0, B1 by $0;");
         pigServer.registerQuery("D = order C by $4;");
@@ -1568,7 +1568,7 @@ public class TestPruneColumn {
         pigServer.getPigContext().getProperties().setProperty(
                 PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
                 ObjectSerializer.serialize(optimizerRules));
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile13.toString()), pigServer.getPigContext()) + "' as (a:int, b:chararray);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile13.toString(), pigServer.getPigContext()) + "' as (a:int, b:chararray);");
         pigServer.registerQuery("B = FOREACH A generate a;");
         pigServer.registerQuery("C = GROUP B by a;");
         pigServer.registerQuery("D = filter C by group > 0 and group < 100;");
@@ -1598,8 +1598,8 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup8() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
         pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
         pigServer.registerQuery("D = foreach C generate $0, $1;");
 
@@ -1631,7 +1631,7 @@ public class TestPruneColumn {
     // See PIG-1128
     @Test
     public void testUserDefinedSchema() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
         pigServer.registerQuery("B = foreach A generate c1 as c1 : chararray, c2 as c2 : int, 'CA' as state : chararray;");
         pigServer.registerQuery("C = foreach B generate c1 as c1 : chararray;");
 
@@ -1653,7 +1653,7 @@ public class TestPruneColumn {
     // See PIG-1127
     @Test
     public void testSharedSchemaObject() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile10.toString()), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile10.toString(), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
         pigServer.registerQuery("B = foreach A generate a1;");
         pigServer.registerQuery("C = limit B 10;");
 
@@ -1671,8 +1671,8 @@ public class TestPruneColumn {
     // See PIG-1142
     @Test
     public void testJoin4() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
         pigServer.registerQuery("C = join A by a2, B by b2;");
         pigServer.registerQuery("D = foreach C generate $0,  $1,  $2;");
 
@@ -1696,7 +1696,7 @@ public class TestPruneColumn {
 
     @Test
     public void testFilter4() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
         pigServer.registerQuery("B = filter A by a2==3;");
         pigServer.registerQuery("C = foreach B generate $2;");
 
@@ -1713,7 +1713,7 @@ public class TestPruneColumn {
 
     @Test
     public void testSplit3() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
         pigServer.registerQuery("split A into B if a2==3, C if a2<3;");
         pigServer.registerQuery("C = foreach B generate $2;");
 
@@ -1730,7 +1730,7 @@ public class TestPruneColumn {
 
     @Test
     public void testOrderBy3() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = order A by a2;");
         pigServer.registerQuery("C = foreach B generate a2;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1754,9 +1754,9 @@ public class TestPruneColumn {
 
     @Test
     public void testCogroup9() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
-        pigServer.registerQuery("C = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (c0, c1, c2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (c0, c1, c2);");
         pigServer.registerQuery("D = cogroup A by a2, B by b2, C by c2;");
         pigServer.registerQuery("E = foreach D generate $1, $2;");
         Iterator<Tuple> iter = pigServer.openIterator("E");
@@ -1781,8 +1781,8 @@ public class TestPruneColumn {
     // See PIG-1165
     @Test
     public void testOrderbyWrongSignature() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
         pigServer.registerQuery("C = order A by a1;");
         pigServer.registerQuery("D = join C by a1, B by b0;");
         pigServer.registerQuery("E = foreach D generate a1, b0, b1;");
@@ -1802,8 +1802,8 @@ public class TestPruneColumn {
     // See PIG-1146
     @Test
     public void testUnionMixedPruning() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
-        pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);");
         pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;");
         pigServer.registerQuery("D = union A, C;");
         pigServer.registerQuery("E = foreach D generate $0, $2;");
@@ -1846,9 +1846,9 @@ public class TestPruneColumn {
     // See PIG-1176
     @Test
     public void testUnionMixedSchemaPruning() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = foreach A generate a0;;");
-        pigServer.registerQuery("C = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';");
         pigServer.registerQuery("D = foreach C generate $0;");
         pigServer.registerQuery("E = union B, D;");
         Iterator<Tuple> iter = pigServer.openIterator("E");
@@ -1921,7 +1921,7 @@ public class TestPruneColumn {
     // See PIG-1210
     @Test
     public void testFieldsToReadDuplicatedEntry() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = foreach A generate a0+a0, a1, a2;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
 
@@ -1941,7 +1941,7 @@ public class TestPruneColumn {
     // See PIG-1272
     @Test
     public void testSplit4() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = foreach A generate a0;");
         pigServer.registerQuery("C = join A by a0, B by a0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -1959,7 +1959,7 @@ public class TestPruneColumn {
 
     @Test
     public void testSplit5() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile11.toString()), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile11.toString(), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
         pigServer.registerQuery("B = foreach A generate a0, a1;");
         pigServer.registerQuery("C = join A by a0, B by a0;");
         pigServer.registerQuery("D = filter C by A::a1>=B::a1;");
@@ -1981,7 +1981,7 @@ public class TestPruneColumn {
     // See PIG-1493
     @Test
     public void testInconsistentPruning() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);");
         pigServer.registerQuery("B = foreach A generate CONCAT(a0,a1) as b0, a0, a2;");
         pigServer.registerQuery("C = foreach B generate a0, a2;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
@@ -2003,12 +2003,12 @@ public class TestPruneColumn {
         Path output1 = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
         Path output2 = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
         pigServer.setBatchOn();
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile5.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
         pigServer.registerQuery("B = foreach A generate a0, a1, a2;");
-        pigServer.registerQuery("store B into '" + Util.generateURI(Util.encodeEscape(output1.toString()), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("store B into '" + Util.generateURI(output1.toString(), pigServer.getPigContext()) + "';");
         pigServer.registerQuery("C = order B by a2;");
         pigServer.registerQuery("D = foreach C generate a2;");
-        pigServer.registerQuery("store D into '" + Util.generateURI(Util.encodeEscape(output2.toString()), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("store D into '" + Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';");
         pigServer.executeBatch();
 
         BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString(), pigServer.getPigContext().getProperties())));
@@ -2093,7 +2093,7 @@ public class TestPruneColumn {
     }
 
     public void testAliasInRequiredFieldList() throws Exception{
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' using "
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using "
                 + PruneColumnEvalFunc.class.getName() +"() as (a0, a1, a2);");
         pigServer.registerQuery("B = foreach A generate a1, a2;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -2110,7 +2110,7 @@ public class TestPruneColumn {
 
     @Test
     public void testCogroup10() throws Exception {
-        pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (a0, a1:double);");
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0, a1:double);");
         pigServer.registerQuery("B = foreach A generate a0, a1, 0 as joinField;");
         pigServer.registerQuery("C = group B all;");
         pigServer.registerQuery("D = foreach C generate 0 as joinField, SUM(B.a1) as total;");
@@ -2179,4 +2179,4 @@ public class TestPruneColumn {
         assertTrue(checkLogFileMessage(new String[]{"Map key required for event_serve: $0->[event_guid, filter_key, receive_time]",
                 "Map key required for raw: $0->[cm_serve_id, cm_serve_timestamp_ms, p_url, source, type]"}));
     }
-}
\ No newline at end of file
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestRank3.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank3.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestRank3.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestRank3.java Thu Nov 27 12:49:54 2014
@@ -53,6 +53,12 @@ public class TestRank3 {
 
             data = resetData(pigServer);
             data.set("empty");
+            data.set("testsplit",
+                    tuple(1, 2),
+                    tuple(1, 2),
+                    tuple(3, 1),
+                    tuple(2, 4),
+                    tuple(2, 3));
             data.set(
                     "testcascade",
                     tuple(3,2,3),
@@ -156,6 +162,39 @@ public class TestRank3 {
       verifyExpected(data.get("empty_result"), expected);
     }
 
+    @Test
+    public void testRankWithSplitInMap() throws Exception {
+        String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);"
+            + "R2 = rank R1 by a ;"
+            + "R3 = rank R1 ;"
+            + "R4 = union R2, R3;"
+            + "store R4 into 'R4' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        List<Tuple> expectedResults = Util
+                .getTuplesFromConstantTupleStrings(new String[] { "(1L,1,2)",
+                        "(2L,1,2)", "(3L,3,1)", "(4L,2,4)", "(5L,2,3)", "(1L,1,2)",
+                        "(1L,1,2)", "(3L,2,3)", "(3L,2,4)", "(5L,3,1)" });
+        Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
+    }
+
+    @Test
+    public void testRankWithSplitInReduce() throws Exception {
+        String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);"
+            + "R1 = ORDER R1 by b;"
+            + "R2 = rank R1 by a ;"
+            + "R3 = rank R1;"
+            + "R4 = union R2, R3;"
+            + "store R4 into 'R4' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        List<Tuple> expectedResults = Util
+                .getTuplesFromConstantTupleStrings(new String[] { "(1L,3,1)",
+                        "(2L,1,2)", "(3L,1,2)", "(4L,2,3)", "(5L,2,4)", "(1L,1,2)",
+                        "(1L,1,2)", "(3L,2,4)", "(3L,2,3)", "(5L,3,1)" });
+        Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
+    }
+
     public void verifyExpected(List<Tuple> out, Set<Tuple> expected) {
         for (Tuple tup : out) {
             assertTrue(expected + " contains " + tup, expected.contains(tup));