You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC

svn commit: r1783988 [19/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Added: pig/branches/spark/test/org/apache/pig/test/TestNewPredicatePushDown.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPredicatePushDown.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestNewPredicatePushDown.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/TestNewPredicatePushDown.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.pig.*;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PredicatePushDownFilterExtractor;
+import org.apache.pig.newplan.logical.expression.*;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.apache.pig.Expression.OpType.OP_NOT;
+import static org.apache.pig.Expression.OpType.OP_NULL;
+import static org.apache.pig.Expression.OpType.OP_AND;
+import static org.apache.pig.Expression.OpType.OP_EQ;
+
+/**
+ * unit tests to test extracting new push-down filter conditions out of the filter
+ * condition in the filter following a load which talks to metadata system (.i.e.
+ * implements {@link LoadMetadata})
+ */
+public class TestNewPredicatePushDown {
+    static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+    String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
+            "age:int, browser:map[], location:tuple(country:chararray, zip:int));";
+
+    // PIG-4940
+    @Test
+    public void testSingleUnaryExpressionUnsupportedPushdown() throws Exception {
+        String q = query + "b = filter a by not browser#'type' is null;" +
+                "store b into 'out';";
+        test(q, Arrays.asList("browser"), Arrays.asList(OP_NOT, OP_NULL), null,
+                "(not (browser#'type' is null))", true);
+    }
+
+    // PIG-4953
+    @Test
+    public void testSingleUnaryExpressionSuccessfulPushdown() throws Exception {
+        String q = query + "b = filter a by mrkt is not null;" +
+                "store b into 'out';";
+        test(q, Arrays.asList("mrkt"), Arrays.asList(OP_NOT, OP_NULL),
+                "((mrkt is null) not)", null, true);
+    }
+
+    @Test
+    public void testUnaryAndExpressionSuccessfulPushdown() throws Exception {
+        String q = query + "b = filter a by not (mrkt is null and mrkt == 'us');" +
+                "store b into 'out';";
+        test(q, Arrays.asList("mrkt"), Arrays.asList(OP_NOT, OP_NULL, OP_AND, OP_EQ),
+                "(((mrkt is null) and (mrkt == 'us')) not)", null, true);
+    }
+
+    @Test
+    public void testUnaryAndExpressionUnsupportedPushdown() throws Exception {
+        String q = query + "b = filter a by not (mrkt is null and mrkt != 'us');" +
+                "store b into 'out';";
+        test(q, Arrays.asList("mrkt"), Arrays.asList(OP_NOT, OP_NULL, OP_AND, OP_EQ),
+                null, "(not ((mrkt is null) and (mrkt != us)))", true);
+    }
+
+    private PredicatePushDownFilterExtractor test(String query, List<String> predicateCols, List<Expression.OpType> supportedOpTypes,
+                                                  String expPushFilterString, String expFilterString, boolean unsupportedExpression)
+            throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+        PredicatePushDownFilterExtractor pushColExtractor = new PredicatePushDownFilterExtractor(
+                filter.getFilterPlan(), predicateCols, supportedOpTypes);
+        pushColExtractor.visit();
+
+        if (expPushFilterString == null) {
+            Assert.assertEquals("Checking partition column filter:", null,
+                    pushColExtractor.getPushDownExpression());
+        } else  {
+            Assert.assertEquals("Checking partition column filter:",
+                    expPushFilterString,
+                    pushColExtractor.getPushDownExpression().toString());
+        }
+
+        if (expFilterString == null) {
+            Assert.assertTrue("Check that filter can be removed:",
+                    pushColExtractor.isFilterRemovable());
+        } else {
+            if (unsupportedExpression) {
+                String actual = TestNewPartitionFilterPushDown.getTestExpression(
+                        (LogicalExpression)pushColExtractor.getFilteredPlan().getSources().get(0)).toString();
+                Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
+            } else {
+                String actual = pushColExtractor.getExpression(
+                        (LogicalExpression)pushColExtractor.getFilteredPlan().getSources().get(0)).toString();
+                Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
+            }
+        }
+        return pushColExtractor;
+    }
+
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestPOGenerate.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOGenerate.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPOGenerate.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPOGenerate.java Wed Feb 22 09:43:41 2017
@@ -21,8 +21,10 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -46,6 +48,7 @@ public class TestPOGenerate {
     DataBag cogroup;
     DataBag partialFlatten;
     DataBag simpleGenerate;
+    DataBag mapFlatten;
     Random r = new Random();
     BagFactory bf = BagFactory.getInstance();
     TupleFactory tf = TupleFactory.getInstance();
@@ -54,10 +57,25 @@ public class TestPOGenerate {
     public void setUp() throws Exception {
         Tuple [] inputA = new Tuple[4];
         Tuple [] inputB = new Tuple[4];
+        Tuple [] inputC = new Tuple[4];
         for(int i = 0; i < 4; i++) {
             inputA[i] = tf.newTuple(2);
             inputB[i] = tf.newTuple(1);
+            inputC[i] = tf.newTuple(2);
         }
+        Map map0 = new HashMap<String,String>();
+        Map map1 = new HashMap<String,String>();
+        Map map2 = new HashMap<String,String>();
+        Map map3 = new HashMap<String,String>();
+        map0.put("A","");
+        map0.put("B","");
+        map1.put("A","a");
+        map1.put("B","b");
+        map2.put("A","aa");
+        map2.put("B","bb");
+        map3.put("A","aaa");
+        map3.put("B","bbb");
+
         inputA[0].set(0, 'a');
         inputA[0].set(1, '1');
         inputA[1].set(0, 'b');
@@ -70,6 +88,15 @@ public class TestPOGenerate {
         inputB[1].set(0, 'b');
         inputB[2].set(0, 'a');
         inputB[3].set(0, 'd');
+        inputC[0].set(0, 0);
+        inputC[0].set(1, map0);
+        inputC[1].set(0, 1);
+        inputC[1].set(1, map1);
+        inputC[2].set(0, 2);
+        inputC[2].set(1, map2);
+        inputC[3].set(0, 3);
+        inputC[3].set(1, map3);
+
         DataBag cg11 = bf.newDefaultBag();
         cg11.add(inputA[0]);
         cg11.add(inputA[2]);
@@ -119,15 +146,22 @@ public class TestPOGenerate {
         tPartial[3].append(emptyBag);
 
         partialFlatten = bf.newDefaultBag();
-        for(int i = 0; i < 4; ++i) {
+        for (int i = 0; i < 4; ++i) {
             partialFlatten.add(tPartial[i]);
         }
 
         simpleGenerate = bf.newDefaultBag();
-        for(int i = 0; i < 4; ++i) {
+        for (int i = 0; i < 4; ++i) {
             simpleGenerate.add(inputA[i]);
         }
 
+
+        mapFlatten = bf.newDefaultBag();
+        for (int i = 0; i < inputC.length; ++i) {
+            mapFlatten.add(inputC[i]);
+        }
+
+
         //System.out.println("Cogroup : " + cogroup);
         //System.out.println("Partial : " + partialFlatten);
         //System.out.println("Simple : " + simpleGenerate);
@@ -248,4 +282,49 @@ public class TestPOGenerate {
         assertEquals(simpleGenerate.size(), count);
 
     }
+
+    @Test
+    public void testMapFlattenGenerate() throws Exception {
+        ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        prj1.setResultType(DataType.INTEGER);
+        prj2.setResultType(DataType.MAP);
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(false);
+        toBeFlattened.add(true);
+        PhysicalPlan plan1 = new PhysicalPlan();
+        plan1.add(prj1);
+        PhysicalPlan plan2 = new PhysicalPlan();
+        plan2.add(prj2);
+        List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
+        inputs.add(plan1);
+        inputs.add(plan2);
+        PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
+
+        List<String> obtained = new LinkedList<String>();
+        for (Tuple t : mapFlatten) {
+            poGen.attachInput(t);
+            Result output = poGen.getNextTuple();
+            while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+                //System.out.println(output.result);
+                obtained.add(((Tuple) output.result).toString());
+                output = poGen.getNextTuple();
+            }
+        }
+
+        int count = 0;
+        for (Tuple t : mapFlatten) {
+            Tuple expected = tf.newTuple(3);
+            expected.set(0, t.get(0));
+            for (Object entryObj : ((Map)t.get(1)).entrySet()){
+                Map.Entry entry = ((Map.Entry)entryObj);
+                expected.set(1, entry.getKey());
+                expected.set(2, entry.getValue());
+                assertTrue(obtained.contains(expected.toString()));
+                ++count;
+            }
+        }
+        assertEquals(mapFlatten.size()*2, count);
+
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java Wed Feb 22 09:43:41 2017
@@ -19,6 +19,7 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -42,6 +43,7 @@ import org.apache.pig.ExecType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.parameters.ParseException;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Test;
 
 public class TestParamSubPreproc {
@@ -52,24 +54,9 @@ public class TestParamSubPreproc {
     private FileInputStream pigExResultStream;
     private String basedir = "test/org/apache/pig/test/data";
 
-    /* Test case 1
-     * Use a parameter within a pig script and provide value on the command line.
-     */
-    @Test
-    public void testCmdlineParam() throws Exception{
-        log.info("Starting test testCmdlineParam() ...");
-        ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
-        pigIStream = new BufferedReader(new FileReader(basedir + "/input1.pig"));
-        pigOStream = new FileWriter(basedir + "/output1.pig");
-
-        String[] arg = {"date=20080228"};
-        String[] argFiles = null;
-        ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles);
-
-        FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
-        pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
+    private void compareResults(InputStream expected, InputStream result) throws IOException {
+        BufferedReader inExpected = new BufferedReader(new InputStreamReader(expected));
+        BufferedReader inResult = new BufferedReader(new InputStreamReader(result));
 
         String exLine;
         String resLine;
@@ -89,6 +76,26 @@ public class TestParamSubPreproc {
 
         inExpected.close();
         inResult.close();
+    }
+
+    /* Test case 1
+     * Use a parameter within a pig script and provide value on the command line.
+     */
+    @Test
+    public void testCmdlineParam() throws Exception{
+        log.info("Starting test testCmdlineParam() ...");
+        ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
+        pigIStream = new BufferedReader(new FileReader(basedir + "/input1.pig"));
+        pigOStream = new FileWriter(basedir + "/output1.pig");
+
+        String[] arg = {"date=20080228"};
+        String[] argFiles = null;
+        ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles);
+
+        FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
+        pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
+
+        compareResults(pigExResultStream, pigResultStream);
 
         log.info("Done");
 
@@ -110,27 +117,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Parameter substitution from config file failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Parameter substitution from config file failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
 
         log.info("Done");
     }
@@ -152,27 +140,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResultDefault.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
 
         log.info("Done");
     }
@@ -236,27 +205,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult4.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Parameter substitution within a value failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Parameter substitution within a value failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
 
         log.info("Done");
     }
@@ -280,27 +230,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult4.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
 
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
+        compareResults(pigExResultStream, pigResultStream);
 
-        inExpected.close();
-        inResult.close();
         log.info("Done");
     }
 
@@ -321,27 +253,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResultCmdLnPriorDeclare.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Parameter substitution of command line arg. prior to declare stmt failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Parameter substitution of command line arg. prior to declare stmt failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
 
         log.info("Done");
     }
@@ -364,27 +277,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult4.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
 
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Parameter substitution for a command with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Parameter substitution for a command with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
+        compareResults(pigExResultStream, pigResultStream);
 
-        inExpected.close();
-        inResult.close();
         log.info("Done");
     }
 
@@ -406,27 +301,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
 
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
+        compareResults(pigExResultStream, pigResultStream);
 
-        inExpected.close();
-        inResult.close();
         log.info("Done");
     }
 
@@ -446,27 +323,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
 
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Parameter substitution from multiple config files failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Parameter substitution from multiple config files failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
+        compareResults(pigExResultStream, pigResultStream);
 
-        inExpected.close();
-        inResult.close();
         log.info("Done");
     }
 
@@ -487,27 +346,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
 
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Same Parameter substitution from multiple config files failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Same Parameter substitution from multiple config files failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
+        compareResults(pigExResultStream, pigResultStream);
 
-        inExpected.close();
-        inResult.close();
         log.info("Done");
     }
 
@@ -527,27 +368,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
 
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
+        compareResults(pigExResultStream, pigResultStream);
 
-        inExpected.close();
-        inResult.close();
     }
 
     /* Test case 15,16
@@ -615,27 +438,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     /* Test case 21
@@ -654,27 +458,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
 
@@ -693,27 +478,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
+        compareResults(pigExResultStream, pigResultStream);
 
-        inExpected.close();
-        inResult.close();
     }
 
     /* Test case 23
@@ -732,27 +499,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     /* Test case 24
@@ -771,27 +519,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResultMulDecs.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     /* Test case 25
@@ -809,27 +538,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     /* Test case 26
@@ -847,27 +557,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
 
@@ -886,27 +577,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     /* Test case 29
@@ -924,27 +596,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     /* Test case 30
@@ -962,27 +615,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult2.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     /* Test case 31
@@ -1001,27 +635,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
 
         log.info("Done");
     }
@@ -1042,27 +657,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/inputNoVars.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
+        compareResults(pigExResultStream, pigResultStream);
 
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
         log.info("Done");
     }
 
@@ -1083,27 +680,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult3.txt");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
 
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
 
         log.info("Done");
     }
@@ -1123,27 +701,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResultComment.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     /* Test case
@@ -1212,27 +771,7 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output26.pig");
         InputStream expected = new ByteArrayInputStream(expectedString.getBytes("UTF-8"));
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(expected));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(expected, pigResultStream);
 
         log.info("Done");
     }
@@ -1252,27 +791,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResultDollarSign.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
 
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
     }
 
     @Test
@@ -1289,28 +809,8 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult6.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
-
-        inExpected.close();
-        inResult.close();
 
+        compareResults(pigExResultStream, pigResultStream);
         log.info("Done");
     }
 
@@ -1326,27 +826,9 @@ public class TestParamSubPreproc {
 
         FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
         pigExResultStream = new FileInputStream(basedir + "/ExpectedResult7.pig");
-        BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream));
-        BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream));
-
-        String exLine;
-        String resLine;
-        int lineNum=0;
-
-        while (true) {
-            lineNum++;
-            exLine = inExpected.readLine();
-            resLine = inResult.readLine();
-            if (exLine==null || resLine==null)
-                break;
-            assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim());
-        }
-        if (!(exLine==null && resLine==null)) {
-            fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum);
-        }
 
-        inExpected.close();
-        inResult.close();
+        compareResults(pigExResultStream, pigResultStream);
+        log.info("Done");
     }
 
     @Test
@@ -1398,6 +880,351 @@ public class TestParamSubPreproc {
         assertEquals(resultContent, "daniel\t10\njenny\t20\n");
     }
 
+    @Test
+    public void testCommandLineParamOverwritingDefault() throws Exception {
+        log.info("Starting test testCommandLineParamOverwritingDefault()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        File output1 = File.createTempFile("output1_", "");
+        output1.delete();
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%default output /invalidpathThatShouldFail;",
+                                 "a = load 'runinput';",
+                                  "store a into '$output';"});
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 "-p", "output=" + output1.getAbsolutePath(),
+                                 script1.getAbsolutePath()} , null);
+        Util.deleteDirectory(output1);
+
+        assertTrue("job should succeed", stats.isSuccessful());
+        assertTrue("Default param should be overridden by the commandline param",
+                     output1.getAbsolutePath().endsWith(stats.getOutputNames().get(0)));
+    }
+
+    @Test
+    public void testRunWithParamOverwritingDefault() throws Exception {
+        log.info("Starting test testScopeOfParamWithRunCommand()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        File output1 = File.createTempFile("output1_", "");
+        File output2 = File.createTempFile("output2_", "");
+        output1.delete();
+        output2.delete();
+
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%default output '" + output2.getAbsolutePath() + "';",
+                                 "a = load 'runinput';",
+                                  "store a into '$output';"});
+
+        File mainscript = Util.createFile("mainscript.pig",
+                  new String[] {"run -param output=" + output1.getAbsolutePath()
+                                + " " + script1.getAbsolutePath() + ";"});
+
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 mainscript.getAbsolutePath()} , null);
+        Util.deleteDirectory(output1);
+        Util.deleteDirectory(output2);
+
+        assertTrue("job should succeed", stats.isSuccessful());
+        assertEquals("There should only be 1 output.",
+                     1, stats.getOutputNames().size());
+        assertEquals("Output name should be from output1 and not output2",
+                     output1.getAbsolutePath(),
+                     stats.getOutputLocations().get(0));
+    }
+
+    @Test
+    public void testScopeOfParamWithRunCommand() throws Exception {
+        log.info("Starting test testScopeOfParamWithRunCommand()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        File output1 = File.createTempFile("output1_", "");
+        File output2 = File.createTempFile("output2_", "");
+        output1.delete();
+        output2.delete();
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%default output '" + output1.getAbsolutePath() + "';",
+                                 "a = load 'runinput';",
+                                  "store a into '$output';"});
+
+        File script2 = Util.createFile("runscript2.pig",
+                  new String[] { "%default output '" + output2.getAbsolutePath() + "';",
+                                 "a = load 'runinput';",
+                                  "store a into '$output';"});
+
+        File mainscript = Util.createFile("mainscript.pig",
+                  new String[] { "run " + script1.getAbsolutePath() + ";",
+                                 "run " + script2.getAbsolutePath() + ";" });
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 mainscript.getAbsolutePath()} , null);
+        Util.deleteDirectory(output1);
+        Util.deleteDirectory(output2);
+
+        assertTrue("job should succeed", stats.isSuccessful());
+        assertNotEquals("Two output paths should differ",
+               stats.getOutputNames().get(0), stats.getOutputNames().get(1));
+        assertEquals("Each output should contain 2 records",
+                      2, stats.getOutputStats().get(0).getNumberRecords());
+        assertEquals("Each output should contain 2 records",
+                      2, stats.getOutputStats().get(1).getNumberRecords());
+    }
+
+    @Test
+    public void testScopeOfParamWithNestedRunCommand() throws Exception {
+        log.info("Starting test testScopeOfParamWithRunCommand()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        /*
+         * script1 sets a=1, b=2, c=3; calls script2
+         *   script2 sets b=22 (by -param); calls script3
+         *     script3 sets c=333; saves $a$b$c  (122333)
+         *   script2 saves $a$b$c (1223)
+         * script1 saves $a$b$c (123)
+         */
+        File script3 = Util.createFile("runscript3.pig",
+                  new String[] { "%declare c '333';",
+                                 "a = load 'runinput';",
+                                  "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"});
+
+        File script2 = Util.createFile("runscript2.pig",
+                  new String[] { "run " + script3.getAbsolutePath() + ";",
+                                 "a = load 'runinput';",
+                                  "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"});
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%declare a '1';",
+                                 "%declare b '2';",
+                                 "%declare c '3';",
+                                 "run -param b=22 " + script2.getAbsolutePath() + ";",
+                                 "a = load 'runinput';",
+                                  "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"});
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 script1.getAbsolutePath()} , null);
+
+        for( String output : stats.getOutputNames() ) {
+            assertTrue(output.contains("testScopeOfParamWithNestedRunCommand"));
+            Util.deleteDirectory(new File(output));
+        }
+        assertTrue("job should succeed", stats.isSuccessful());
+        assertEquals("There should be three outputs.", 3, stats.getOutputNames().size());
+
+        for( String expectedoutput : new String [] {"testScopeOfParamWithNestedRunCommand123",
+                                            "testScopeOfParamWithNestedRunCommand1223",
+                                            "testScopeOfParamWithNestedRunCommand122333"} ) {
+            boolean found=false;
+            for( String output : stats.getOutputNames() ) {
+                if( output.endsWith(expectedoutput) ) {
+                    found=true;
+                }
+            }
+            assertTrue("Output " + expectedoutput + " should exist.", found);
+        }
+    }
+
+    /*  This currently does not work since PigMacro only picks the
+     *  param setting from the root script (script1)
+     *  To revisit after Grunt moves to ANTLR in PIG-2597.
+     *  Tracking in PIG-5028.
+     *
+
+    @Test
+    public void testScopeOfParamWithMacro() throws Exception {
+        log.info("Starting test testScopeOfParamWithMacro()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        File macro = Util.createFile("testmacro.pig",
+                  new String[] { "DEFINE mymacro (A) RETURNS void {",
+                                 "store $A into 'testScopeOfParamWithMacro${a}${b}${c}';",
+                                 "};"});
+
+        File script3 = Util.createFile("runscript3.pig",
+                  new String[] { "%declare c '333';"});
+
+        File script2 = Util.createFile("runscript2.pig",
+                  new String[] { "%declare b '22';",
+                                 "import '" + macro.getAbsolutePath() + "';",
+                                 "a = load 'runinput';",
+                                 "mymacro(a);",
+                                 "exec " + script3.getAbsolutePath() + ";"});
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%declare a '1';",
+                                 "%declare b '2';",
+                                 "%declare c '3';",
+                                 "exec " + script2.getAbsolutePath() + ";"});
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 script1.getAbsolutePath()} , null);
+
+        assertTrue("job should succeed", stats.isSuccessful());
+        Util.deleteDirectory(new File(stats.getOutputNames().get(0)));
+        assertEquals("There should be only 1 output.", 1, stats.getOutputNames().size());
+        assertTrue("Expected output testScopeOfParamWithMacro1223 but got " + stats.getOutputNames().get(0),
+                   stats.getOutputNames().get(0).equals("testScopeOfParamWithMacro1223"));
+    }
+    */
+
+
+    /*
+     *  Test parameter substition when register contains /* globbing
+     */
+    @Test
+    public void testSubstitutionWithRegisterGlobbing() throws Exception{
+        log.info("Starting test testSubstitutionWithRegisterGlobbing()");
+        final String queryString =
+                "register /abc/$regdir/*.jar;\n" +
+                "A = LOAD '$input' USING PigStorage ();\n" +
+                "STORE A INTO '$output';\n" +
+                " /* comment that would make register globbing to be part of the multi-line comment */\n";
+
+
+        ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
+        pigIStream = new BufferedReader(
+                                        new InputStreamReader(new ByteArrayInputStream(queryString.getBytes("UTF-8"))));
+        pigOStream = new FileWriter(basedir + "/output1.pig");
+
+        String[] arg = {"input = 'input.txt'", "output = 'output.txt'", "regdir = 'def'"};
+        String[] argFiles = null;
+        ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles);
+
+        FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
+
+        String expectedString = queryString.replaceAll("\\$input","input.txt")
+                                           .replaceAll("\\$output","output.txt")
+                                           .replaceAll("\\$regdir","def");
+        InputStream expected = new ByteArrayInputStream(expectedString.getBytes("UTF-8"));
+
+        compareResults(expected, pigResultStream);
+
+        log.info("Done");
+    }
+
+    /*
+     *  Test parameter substition when load contains /* globbing
+     */
+    @Test
+    public void testSubstitutionWithLoadGlobbing() throws Exception{
+        log.info("Starting test testSubstitutionWithLoadGlobbing()");
+        final String queryString =
+                "A = LOAD '/zzz/*' USING PigStorage ();\n" +
+                "STORE A INTO '$output';\n" +
+                " /* comment that would make register globbing to be part of the multi-line comment */\n";
+
+
+        ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
+        pigIStream = new BufferedReader(
+                                        new InputStreamReader(new ByteArrayInputStream(queryString.getBytes("UTF-8"))));
+        pigOStream = new FileWriter(basedir + "/output1.pig");
+
+        String[] arg = {"output = 'output.txt'"};
+        String[] argFiles = null;
+        ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles);
+
+        FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
+
+        String expectedString = queryString.replaceAll("\\$output","output.txt");
+        InputStream expected = new ByteArrayInputStream(expectedString.getBytes("UTF-8"));
+
+        compareResults(expected, pigResultStream);
+
+        log.info("Done");
+    }
+
+    @Test
+    public void testSubstitutionWithRedeclaration() throws Exception{
+        log.info("Starting test testSubstitutionWithRedeclaration()");
+        final String queryString =
+              "%declare output '/tmp/abc';\n" +
+              "%declare actualoutput '$output.out';\n" +
+              "A = load 'input.txt' ;\n" +
+              "store A into '$actualoutput';\n" +
+              "%declare output '/tmp/def';\n" +
+              "%declare actualoutput '$output.out';\n" +
+              "store A into '$actualoutput';";
+
+
+        ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
+        pigIStream = new BufferedReader(
+                                        new InputStreamReader(new ByteArrayInputStream(queryString.getBytes("UTF-8"))));
+        pigOStream = new FileWriter(basedir + "/output1.pig");
+
+        String[] arg = {"output = 'output.txt'"};
+        String[] argFiles = null;
+        ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles);
+
+        FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig");
+
+        String expectedString = queryString.replaceAll("%declare [0-9a-zA-Z.'/\\$; ]*\n",";\n")
+                                .replaceAll("\\$","")
+                                .replaceFirst("actualoutput","/tmp/abc.out")
+                                .replaceFirst("actualoutput","/tmp/def.out");
+        InputStream expected = new ByteArrayInputStream(expectedString.getBytes("UTF-8"));
+
+        compareResults(expected, pigResultStream);
+
+        log.info("Done");
+    }
+
+    @Test
+    public void testSubstitutionWithRedeclaredShell() throws Exception{
+        log.info("Starting test testSubstitutionWithRedeclaredShell()");
+        final String queryString =
+              "A = load 'input.txt' ;\n" +
+              "%declare now `bash -c \"date +'%Y%m%d_%H:%M:%S'; sleep 1;\"`;\n" +
+              "store A into '$now';\n" +
+              "%declare now `bash -c \"date +'%Y%m%d_%H:%M:%S'; sleep 1;\"`;\n" +
+              "store A into '$now';\n";
+
+        ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
+        pigIStream = new BufferedReader(
+                                        new InputStreamReader(new ByteArrayInputStream(queryString.getBytes("UTF-8"))));
+        pigOStream = new FileWriter(basedir + "/output1.pig");
+
+        String[] arg = {"output = 'output.txt'"};
+        String[] argFiles = null;
+        ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles);
+
+        BufferedReader pigresult = new BufferedReader(new InputStreamReader(new FileInputStream(basedir + "/output1.pig")));
+
+
+        String [] filenames = new String [2];
+        int index=0;
+        String line;
+        while ((line = pigresult.readLine())!=null) {
+            if( line.startsWith("store A into") ) {
+                filenames[index++] = line.split(" ")[3];
+            }
+        }
+
+        assertEquals("There should be 2 store statements", 2, index);
+        assertNotEquals("Identical shell param should be reexecuted.",
+                     filenames[0],
+                     filenames[1]);
+        log.info("Done");
+    }
+
     @SuppressWarnings("resource")
     private BufferedReader WithConditionalReplacement(String filename, String orig, String dest, boolean replace) throws IOException {
         BufferedReader pigOrigIStream = new BufferedReader(new FileReader(filename));

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigContext.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigContext.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigContext.java Wed Feb 22 09:43:41 2017
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
@@ -270,7 +271,7 @@ public class TestPigContext {
         assertEquals(JOB_TRACKER,
                 pigServer.getPigContext().getProperties().getProperty(MRConfiguration.JOB_TRACKER));
         assertEquals(FS_NAME,
-                pigServer.getPigContext().getProperties().getProperty("fs.default.name"));
+                pigServer.getPigContext().getProperties().getProperty(FileSystem.FS_DEFAULT_NAME_KEY));
         assertEquals(TMP_DIR_PROP,
                 pigServer.getPigContext().getProperties().getProperty("hadoop.tmp.dir"));
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Wed Feb 22 09:43:41 2017
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -35,6 +36,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
@@ -62,7 +65,10 @@ import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class TestPigRunner {
 
@@ -73,6 +79,9 @@ public class TestPigRunner {
     private static final String OUTPUT_FILE = "output";
     private static final String PIG_FILE = "test.pig";
 
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
         cluster = MiniGenericCluster.buildCluster();
@@ -803,10 +812,9 @@ public class TestPigRunner {
     }
 
     @Test
+    @Ignore
+    // Skip in hadoop 23 test, see PIG-2449
     public void classLoaderTest() throws Exception {
-        // Skip in hadoop 23 test, see PIG-2449
-        if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
-            return;
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("register test/org/apache/pig/test/data/pigtestloader.jar");
         w.println("A = load '" + INPUT_FILE + "' using org.apache.pig.test.PigTestLoader();");
@@ -1147,8 +1155,13 @@ public class TestPigRunner {
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
             if (execType.equals("tez")) {
-                assertEquals(outputs.get(0).getNumberRecords(), 5);
-                assertEquals(outputs.get(1).getNumberRecords(), 2);
+                if( outputs.get(0).getLocation().endsWith("tmp/output") ) {
+                    assertEquals(2, outputs.get(0).getNumberRecords());
+                    assertEquals(5, outputs.get(1).getNumberRecords());
+                } else {
+                    assertEquals(5, outputs.get(0).getNumberRecords());
+                    assertEquals(2, outputs.get(1).getNumberRecords());
+                }
             } else {
                 for (OutputStats outstats : outputs) {
                     // the multi-output counters are disabled
@@ -1214,6 +1227,77 @@ public class TestPigRunner {
             Util.deleteFile(cluster, "tmp/output");
         }
     }
+
+    @Test
+    public void testStoredScriptContents() throws Exception {
+        String scriptContents = "sh echo success;\n";
+        FileUtils.writeStringToFile(new File(PIG_FILE), scriptContents);
+        Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE);
+
+        Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
+        try {
+            runAndValidateStoredScriptContents(PIG_FILE, scriptContents);
+            runAndValidateStoredScriptContents(inputInDfs.toString(), scriptContents);
+        } finally {
+            FileUtils.deleteQuietly(new File(PIG_FILE));
+            Util.deleteQuietly(cluster, PIG_FILE);
+        }
+    }
+
+    @Test
+    public void testErrorLogUnderCustomDir() throws Exception {
+        try (PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE))) {
+            w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+            w.println("B = foreach A generate StringSize(a0);");
+            w.println("store B into '" + OUTPUT_FILE + "';");
+        }
+        Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE);
+
+        Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
+        try {
+            runAndValidateCustomErrorLogDir(PIG_FILE);
+            runAndValidateCustomErrorLogDir(inputInDfs.toString());
+        } finally {
+            FileUtils.deleteQuietly(new File(PIG_FILE));
+            Util.deleteQuietly(cluster, PIG_FILE);
+        }
+    }
+
+    private void runAndValidateStoredScriptContents(String scriptPath, String expectedContents) {
+        PigStats stats = runPigLocally(scriptPath);
+        assertTrue(stats.isSuccessful());
+        assertEquals(expectedContents, stats.getScript());
+
+        stats = runPigLocally("-f", scriptPath);
+        assertTrue(stats.isSuccessful());
+        assertEquals(expectedContents, stats.getScript());
+    }
+
+    private void runAndValidateCustomErrorLogDir(String scriptPath) throws IOException {
+        File logsFolder = temporaryFolder.newFolder();
+        String logsPath = logsFolder.getAbsolutePath();
+        assertFileCountUnderDir(logsFolder, 0);
+
+        PigStats stats = runPigLocally("-l", logsPath, scriptPath);
+        assertFalse(stats.isSuccessful());
+        assertFileCountUnderDir(logsFolder, 1);
+
+        stats = runPigLocally("-l", logsPath, "-f", scriptPath);
+        assertFalse(stats.isSuccessful());
+        assertFileCountUnderDir(logsFolder, 2);
+    }
+
+    private void assertFileCountUnderDir(File directory, int expectedFileCount) throws IOException {
+        String[] files = directory.list();
+        assertNotNull(files);
+        assertEquals(expectedFileCount, files.length);
+    }
+
+    private PigStats runPigLocally(String... extraArgs) {
+        String[] args = ArrayUtils.addAll(new String[]{"-x", "local"}, extraArgs);
+        return PigRunner.run(args, new TestNotificationListener("local"));
+    }
+
     public static class TestNotificationListener implements PigProgressNotificationListener {
 
         private Map<String, int[]> numMap = new HashMap<String, int[]>();

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java Wed Feb 22 09:43:41 2017
@@ -30,6 +30,9 @@ import java.util.Properties;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import static org.apache.pig.builtin.mock.Storage.tuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -99,7 +102,7 @@ public class TestPigScriptParser {
 
     @Test
     public void testDefineUDF() throws Exception {
-        PigServer ps = new PigServer(ExecType.LOCAL);
+        PigServer ps = new PigServer(Util.getLocalTestMode());
         String inputData[] = {
                 "dshfdskfwww.xyz.com/sportsjoadfjdslpdshfdskfwww.xyz.com/sportsjoadfjdsl" ,
                 "kas;dka;sd" ,
@@ -156,6 +159,79 @@ public class TestPigScriptParser {
         }
     }
 
+    @Test
+    public void testBackSlashOnly() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
+        Data data = Storage.resetData(pig);
+        data.set("input", tuple("abc"), tuple("\\bcd"), tuple("'cde"), tuple("def\\\\"));
+
+        String query =
+            "A = load 'input' USING mock.Storage() as (a0:chararray);\n"
+            // java String is escaping "\" so the following line is equivalent of
+            // B = FILTER A by STARTSWITH(a0,'\\'); in the pig script
+            + "B = FILTER A by STARTSWITH(a0,'\\\\');\n"
+            + "store B into 'out' using mock.Storage;" ;
+
+        Util.registerMultiLineQuery(pig, query);
+        List<Tuple> list = data.get("out");
+
+        assertEquals("There should be only one match", 1, list.size());
+        Tuple t = list.get(0);
+        assertEquals("result should have only one field", 1, t.size() );
+        assertEquals("\\bcd",(String) t.get(0));
+    }
+
+
+    @Test
+    public void testBackSlashSingleQuote() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
+        Data data = Storage.resetData(pig);
+        data.set("input", tuple("abc"), tuple("\\bcd"), tuple("'cde"), tuple("def\\\\"));
+
+        String query =
+            "A = load 'input' USING mock.Storage() as (a0:chararray);\n"
+            // java String is escaping "\" so the following line is equivalent of
+            // B = FILTER A by STARTSWITH(a0,'\''); in the pig script
+            + "B = FILTER A by STARTSWITH(a0,'\\'');\n"
+            + "store B into 'out' using mock.Storage;" ;
+
+        Util.registerMultiLineQuery(pig, query);
+        List<Tuple> list = data.get("out");
+
+        assertEquals("There should be only one match", 1, list.size());
+        Tuple t = list.get(0);
+        assertEquals("result should have only one field", 1, t.size() );
+        assertEquals("'cde",(String) t.get(0));
+    }
+
+    @Test
+    public void testBackSlashReplace() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
+        Data data = Storage.resetData(pig);
+        //After java escaping, these tuples have
+        //'abc', '\bcd' and 'def\\' respectively
+        data.set("input", tuple("abc"), tuple("\\bcd"), tuple("def\\\\"));
+
+        String query =
+            "A = load 'input' USING mock.Storage() as (a0:chararray);\n"
+            // java String is escaping "\" so the following line is equivalent of
+            //"B = FOREACH A GENERATE REPLACE(a0,'\\\\','+');\n"
+            + "B = FOREACH A GENERATE REPLACE(a0,'\\\\\\\\','+');\n"
+            + "store B into 'out' using mock.Storage;" ;
+
+            // REPLACE(a0,'\\\\','+')
+            // --> Pig parser unescape and pass "\\" to REPLACE UDF.
+            // --> REPLACE UDF calls, Pattern.compile("\\"); which
+            // matches "\"
+
+        Util.registerMultiLineQuery(pig, query);
+        List<Tuple> list = data.get("out");
+
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {"('abc')","('+bcd')", "('def++')"});
+        Util.checkQueryOutputsAfterSort(list, expectedRes);
+    }
     private void checkParsedConstContent(PigServer pigServer,
                                          PigContext pigContext,
                                          String query,

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java Wed Feb 22 09:43:41 2017
@@ -103,11 +103,7 @@ public class TestPigStatsMR extends Test
 
     private static MROperPlan getMRPlan(PhysicalPlan pp, PigContext ctx) throws Exception {
         MapReduceLauncher launcher = new MapReduceLauncher();
-        java.lang.reflect.Method compile = launcher.getClass()
-                .getDeclaredMethod("compile",
-                        new Class[] { PhysicalPlan.class, PigContext.class });
-        compile.setAccessible(true);
-        return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx });
+        return launcher.compile(pp,ctx);
     }
 
     private static String getAlias(MapReduceOper mro) throws Exception {