You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/01 08:22:26 UTC

svn commit: r1573129 - in /pig/branches/tez: src/org/apache/pig/ test/ test/org/apache/pig/test/

Author: cheolsoo
Date: Sat Mar  1 07:22:25 2014
New Revision: 1573129

URL: http://svn.apache.org/r1573129
Log:
PIG-3784: Port more mini cluster tests to Tez (cheolsoo)

Modified:
    pig/branches/tez/src/org/apache/pig/PigServer.java
    pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
    pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java
    pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java
    pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java
    pig/branches/tez/test/org/apache/pig/test/TestPigContext.java
    pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
    pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java
    pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
    pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java
    pig/branches/tez/test/tez-tests

Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Sat Mar  1 07:22:25 2014
@@ -244,13 +244,8 @@ public class PigServer {
         addJarsFromProperties();
         markPredeployedJarsFromProperties();
 
-        if (PigStats.get() == null) {
-            PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
-        }
-
-        if (ScriptState.get() == null) {
-            ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
-        }
+        PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
+        ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
     }
 
     private void addJarsFromProperties() throws ExecException {

Modified: pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCombiner.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCombiner.java Sat Mar  1 07:22:25 2014
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
@@ -48,7 +47,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestCombiner {
-
     private static MiniGenericCluster cluster;
     private static Properties properties;
 
@@ -74,14 +72,6 @@ public class TestCombiner {
         FileLocalizer.setInitialized(false);
     }
 
-    @After
-    public void tearDown() throws Exception {
-        // Nullify PigStats and ScriptState after every run to ensure new
-        // objects are instantiated for next run.
-        PigStats.start(null);
-        ScriptState.start(null);
-    }
-
     @Test
     public void testSuccessiveUserFuncs1() throws Exception {
         String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +

Modified: pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java Sat Mar  1 07:22:25 2014
@@ -47,7 +47,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestCustomPartitioner {
-
     private static MiniGenericCluster cluster;
     private static Properties properties;
     private static PigServer pigServer;

Modified: pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java Sat Mar  1 07:22:25 2014
@@ -28,14 +28,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 import java.util.StringTokenizer;
 
-import junit.framework.Assert;
-
-import org.apache.pig.ComparisonFunc;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -56,32 +53,37 @@ import org.apache.pig.impl.util.Pair;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.Identity;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
 
-@RunWith(JUnit4.class)
 public class TestEvalPipeline {
-    
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    private PigServer pigServer;
-    private PigContext pigContext;
-
-    TupleFactory mTf = TupleFactory.getInstance();
-    BagFactory mBf = BagFactory.getInstance();
-    
+    private static PigServer pigServer;
+    private static PigContext pigContext;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
+    private TupleFactory mTf = TupleFactory.getInstance();
+    private BagFactory mBf = BagFactory.getInstance();
+
     @Before
     public void setUp() throws Exception{
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), properties);
         pigContext = pigServer.getPigContext();
     }
-    
+
+    @BeforeClass
+    public static void oneTimeSetup() {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
     static public class MyBagFunction extends EvalFunc<DataBag>{
         @Override
         public DataBag exec(Tuple input) throws IOException {
@@ -93,35 +95,35 @@ public class TestEvalPipeline {
             return output;
         }
     }
-    
+
     @Test
     public void testFunctionInsideFunction() throws Exception{
         File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
 
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(f1.toString(), pigContext) 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(f1.toString(), pigContext)
                 + "' using " + PigStorage.class.getName() + "(':');");
         pigServer.registerQuery("b = foreach a generate 1-1/1;");
         Iterator<Tuple> iter  = pigServer.openIterator("b");
-        
+
         for (int i=0 ;i<3; i++){
-            Assert.assertEquals(DataType.toDouble(iter.next().get(0)), 0.0);
+            Assert.assertEquals(0.0d, DataType.toDouble(iter.next().get(0)).doubleValue(), 0.0d);
         }
     }
-    
+
     @Test
     public void testJoin() throws Exception{
         File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
         File f2 = Util.createFile(new String[]{"b","b","a"});
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(f1.toString(), pigContext) + "' using " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(f1.toString(), pigContext) + "' using "
                 + PigStorage.class.getName() + "(':');");
-        pigServer.registerQuery("b = load '" 
+        pigServer.registerQuery("b = load '"
                 + Util.generateURI(f2.toString(), pigContext) + "';");
-        pigServer.registerQuery("c = cogroup a by $0, b by $0;");        
+        pigServer.registerQuery("c = cogroup a by $0, b by $0;");
         pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
-        
+
         Iterator<Tuple> iter = pigServer.openIterator("d");
         int count = 0;
         while(iter.hasNext()){
@@ -131,7 +133,7 @@ public class TestEvalPipeline {
         }
         Assert.assertEquals(count, 4);
     }
-    
+
     @Test
     public void testDriverMethod() throws Exception{
         File f = Util.createTempFileDelOnExit("tmp", "");
@@ -139,8 +141,8 @@ public class TestEvalPipeline {
         pw.println("a");
         pw.println("a");
         pw.close();
-        pigServer.registerQuery("a = foreach (load '" 
-                + Util.generateURI(f.toString(), pigContext) + "') " 
+        pigServer.registerQuery("a = foreach (load '"
+                + Util.generateURI(f.toString(), pigContext) + "') "
                 + "generate 1, flatten(" + MyBagFunction.class.getName() + "(*));");
         Iterator<Tuple> iter = pigServer.openIterator("a");
         int count = 0;
@@ -153,61 +155,61 @@ public class TestEvalPipeline {
         Assert.assertEquals(count, 6);
         f.delete();
     }
-    
+
     @Test
     public void testMapLookup() throws Exception {
         DataBag b = BagFactory.getInstance().newDefaultBag();
         Map<String, Object> colors = new HashMap<String, Object>();
         colors.put("apple","red");
         colors.put("orange","orange");
-        
+
         Map<String, Object> weights = new HashMap<String, Object>();
         weights.put("apple","0.1");
         weights.put("orange","0.3");
-        
+
         Tuple t = mTf.newTuple();
         t.append(colors);
         t.append(weights);
         b.add(t);
-        
+
         File tmpFile = File.createTempFile("tmp", "");
         tmpFile.delete(); // we only needed the temp file name, so delete the file
         String fileName = Util.removeColon(tmpFile.getAbsolutePath());
 
         PigFile f = new PigFile(fileName);
         f.store(b, new FuncSpec(BinStorage.class.getCanonicalName()),
-                pigServer.getPigContext());        
-        
+                pigServer.getPigContext());
+
         pigServer.registerQuery("a = load '" + Util.encodeEscape(fileName) + "' using BinStorage();");
         pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
         Iterator<Tuple> iter = pigServer.openIterator("b");
         t = iter.next();
         Assert.assertEquals(t.get(0).toString(), "red");
-        Assert.assertEquals(DataType.toDouble(t.get(1)), 0.3);
+        Assert.assertEquals(0.3d, DataType.toDouble(t.get(1)).doubleValue(), 0.0d);
         Assert.assertFalse(iter.hasNext());
         Util.deleteFile(cluster, fileName);
     }
-    
+
     static public class TitleNGrams extends EvalFunc<DataBag> {
-        
+
         @Override
-        public DataBag exec(Tuple input) throws IOException {    
+        public DataBag exec(Tuple input) throws IOException {
             try {
                 DataBag output = BagFactory.getInstance().newDefaultBag();
                 String str = input.get(0).toString();
-            
+
                 String title = str;
 
                 if (title != null) {
                     List<String> nGrams = makeNGrams(title);
-                    
+
                     for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
                         Tuple t = TupleFactory.getInstance().newTuple(1);
                         t.set(0, it.next());
                         output.add(t);
                     }
                 }
-    
+
                 return output;
             } catch (ExecException ee) {
                 IOException ioe = new IOException(ee.getMessage());
@@ -215,28 +217,28 @@ public class TestEvalPipeline {
                 throw ioe;
             }
         }
-        
-        
+
+
         List<String> makeNGrams(String str) {
             List<String> tokens = new ArrayList<String>();
-            
+
             StringTokenizer st = new StringTokenizer(str);
             while (st.hasMoreTokens())
                 tokens.add(st.nextToken());
-            
+
             return nGramHelper(tokens, new ArrayList<String>());
         }
-        
+
         ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
             if (str.size() == 0)
                 return nGrams;
-            
+
             for (int i = 0; i < str.size(); i++)
                 nGrams.add(makeString(str.subList(0, i+1)));
-            
+
             return nGramHelper(str.subList(1, str.size()), nGrams);
         }
-        
+
         String makeString(List<String> list) {
             StringBuffer sb = new StringBuffer();
             for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
@@ -288,18 +290,18 @@ public class TestEvalPipeline {
             myMap.put("map", mapInMap);
             myMap.put("tuple", tuple);
             myMap.put("bag", bag);
-            return myMap; 
+            return myMap;
         }
 
         public Schema outputSchema(Schema input) {
             return new Schema(new Schema.FieldSchema(null, DataType.MAP));
         }
     }
-    
+
     @Test
     public void testBagFunctionWithFlattening() throws Exception{
         File queryLogFile = Util.createFile(
-                    new String[]{ 
+                    new String[]{
                         "stanford\tdeer\tsighting",
                         "bush\tpresident",
                         "stanford\tbush",
@@ -309,66 +311,52 @@ public class TestEvalPipeline {
                         "stanford\tpresident",
                     }
                 );
-                
+
         File newsFile = Util.createFile(
                     new String[]{
                         "deer seen at stanford",
-                        "george bush visits stanford", 
-                        "yahoo hosting a conference in the bay area", 
+                        "george bush visits stanford",
+                        "yahoo hosting a conference in the bay area",
                         "who will win the world cup"
                     }
-                );    
-        
+                );
+
         Map<String, Integer> expectedResults = new HashMap<String, Integer>();
         expectedResults.put("bush", 2);
         expectedResults.put("stanford", 3);
         expectedResults.put("world", 1);
         expectedResults.put("conference", 1);
-        
-        pigServer.registerQuery("newsArticles = LOAD '" 
-                + Util.generateURI(newsFile.toString(), pigContext) 
+
+        pigServer.registerQuery("newsArticles = LOAD '"
+                + Util.generateURI(newsFile.toString(), pigContext)
                 + "' USING " + TextLoader.class.getName() + "();");
-        pigServer.registerQuery("queryLog = LOAD '" 
+        pigServer.registerQuery("queryLog = LOAD '"
                 + Util.generateURI(queryLogFile.toString(), pigContext) + "';");
 
         pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
         pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
         pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
-        
+
         Iterator<Tuple> iter = pigServer.openIterator("answer");
         if(!iter.hasNext()) Assert.fail("No Output received");
         while(iter.hasNext()){
             Tuple t = iter.next();
-            Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),(DataType.toDouble(t.get(0))).doubleValue());
+            Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),
+                    (DataType.toDouble(t.get(0))).doubleValue(), 0.0d);
         }
     }
-    
-    /*    
-    @Test
-    public void testSort() throws Exception{
-        testSortDistinct(false, false);
-    }    
-    */    
 
     @Test
-    public void testSortWithUDF() throws Exception{
-        testSortDistinct(false, true);
-    }    
+    public void testSort() throws Exception{
+        testSortDistinct(false);
+    }
 
     @Test
     public void testDistinct() throws Exception{
-        testSortDistinct(true, false);
+        testSortDistinct(true);
     }
-    
-    public static class TupComp extends ComparisonFunc {
 
-        @Override
-        public int compare(Tuple t1, Tuple t2) {
-            return t1.compareTo(t2);
-        }
-    }
-
-    private void testSortDistinct(boolean eliminateDuplicates, boolean useUDF) throws Exception{
+    private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
         int LOOP_SIZE = 1024*16;
         File tmpFile = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -376,18 +364,14 @@ public class TestEvalPipeline {
         for(int i = 0; i < LOOP_SIZE; i++) {
             ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
         }
-        ps.close(); 
-        
-        pigServer.registerQuery("A = LOAD '" 
+        ps.close();
+
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         if (eliminateDuplicates){
             pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
         }else{
-            if(!useUDF) {
-                pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
-            } else {
-                pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";");
-            }
+            pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
         }
         Iterator<Tuple> iter = pigServer.openIterator("B");
         String last = "";
@@ -404,9 +388,9 @@ public class TestEvalPipeline {
                 Assert.assertEquals(t.size(), 2);
                 last = t.get(0).toString();
             }
-        }        
+        }
     }
-    
+
     @Test
     public void testNestedPlan() throws Exception{
         int LOOP_COUNT = 10;
@@ -420,7 +404,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -460,7 +444,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -502,7 +486,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = limit A 5;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -514,15 +498,15 @@ public class TestEvalPipeline {
         }
         Assert.assertEquals(5, numIdentity);
     }
-    
+
     @Test
     public void testComplexData() throws IOException, ExecException {
         // Create input file with ascii data
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
         pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
         Iterator<Tuple> it = pigServer.openIterator("b");
@@ -532,10 +516,10 @@ public class TestEvalPipeline {
         Assert.assertEquals("2", t.get(2).toString());
         Assert.assertEquals("value1", t.get(3).toString());
         Assert.assertEquals("value2", t.get(4).toString());
-        
+
         //test with BinStorage
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
         String output = "/pig/out/TestEvalPipeline-testComplexData";
         pigServer.deleteFile(output);
@@ -549,17 +533,17 @@ public class TestEvalPipeline {
         Assert.assertEquals("1", t.get(1).toString());
         Assert.assertEquals("2", t.get(2).toString());
         Assert.assertEquals("value1", t.get(3).toString());
-        Assert.assertEquals("value2", t.get(4).toString());        
+        Assert.assertEquals("value2", t.get(4).toString());
     }
 
     @Test
     public void testBinStorageDetermineSchema() throws IOException, ExecException {
         // Create input file with ascii data
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
         pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
         Iterator<Tuple> it = pigServer.openIterator("b");
@@ -569,10 +553,10 @@ public class TestEvalPipeline {
         Assert.assertEquals(2, t.get(2));
         Assert.assertEquals("value1", t.get(3).toString());
         Assert.assertEquals("value2", t.get(4).toString());
-        
+
         //test with BinStorage
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
         String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema";
         pigServer.deleteFile(output);
@@ -587,7 +571,7 @@ public class TestEvalPipeline {
         String[] generates = {"q = foreach p generate COUNT(b), t2.a, t2.b as t2b, m#'key1', m#'key2', b;",
                 "q = foreach p generate COUNT(b), t2.$0, t2.$1, m#'key1', m#'key2', b;",
                 "q = foreach p generate COUNT($0), $1.$0, $1.$1, $2#'key1', $2#'key2', $0;"};
-        
+
         for (int i = 0; i < loads.length; i++) {
             pigServer.registerQuery(loads[i]);
             pigServer.registerQuery(generates[i]);
@@ -605,18 +589,18 @@ public class TestEvalPipeline {
             for (Iterator<Tuple> bit = bg.iterator(); bit.hasNext();) {
                 Tuple bt = bit.next();
                 Assert.assertEquals(String.class, bt.get(0).getClass());
-                Assert.assertEquals(String.class, bt.get(1).getClass());            
+                Assert.assertEquals(String.class, bt.get(1).getClass());
             }
-        }        
+        }
     }
 
     @Test
     public void testProjectBag() throws IOException, ExecException {
         // This tests make sure that when a bag with multiple columns is
         // projected all columns apear in the output
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"f1\tf2\tf3"});
-        pigServer.registerQuery("a = load '" 
+        pigServer.registerQuery("a = load '"
                 + Util.generateURI(input.toString(), pigContext) + "' as (x, y, z);");
         pigServer.registerQuery("b = group a by x;");
         pigServer.registerQuery("c = foreach b generate flatten(a.(y, z));");
@@ -630,11 +614,11 @@ public class TestEvalPipeline {
     @Test
     public void testBinStorageDetermineSchema2() throws IOException, ExecException {
         // Create input file with ascii data
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"pigtester\t10\t1.2"});
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (name:chararray, age:int, gpa:double);");
         String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema2";
         pigServer.deleteFile(output);
@@ -649,7 +633,7 @@ public class TestEvalPipeline {
         String[] generates = {"q = foreach p generate name, age, gpa;",
                 "q = foreach p generate name, age, gpa;",
                 "q = foreach p generate $0, $1, $2;"};
-        
+
         for (int i = 0; i < loads.length; i++) {
             pigServer.registerQuery(loads[i]);
             pigServer.registerQuery(generates[i]);
@@ -662,7 +646,7 @@ public class TestEvalPipeline {
             Assert.assertEquals(1.2, t.get(2));
             Assert.assertEquals(Double.class, t.get(2).getClass());
         }
-        
+
         // test that valid casting is allowed
         pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
                 " as (name, age:long, gpa:float);");
@@ -675,7 +659,7 @@ public class TestEvalPipeline {
         Assert.assertEquals(Long.class, t.get(1).getClass());
         Assert.assertEquals(1.2f, t.get(2));
         Assert.assertEquals(Float.class, t.get(2).getClass());
-        
+
         // test that implicit casts work
         pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
         " as (name, age, gpa);");
@@ -689,15 +673,15 @@ public class TestEvalPipeline {
         Assert.assertEquals(1, t.get(2));
         Assert.assertEquals(Integer.class, t.get(2).getClass());
     }
-    
+
     @Test
     public void testCogroupWithInputFromGroup() throws IOException, ExecException {
         // Create input file with ascii data
-        File input = Util.createInputFile("tmp", "", 
-                new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2", 
+        File input = Util.createInputFile("tmp", "",
+                new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2",
                 "pigtester2\t10\t1.2",
                 "pigtester3\t10\t1.2", "pigtester3\t20\t1.2", "pigtester3\t30\t1.2"});
-        
+
         Map<String, Pair<Long, Long>> resultMap = new HashMap<String, Pair<Long, Long>>();
         // we will in essence be doing a group on first column and getting
         // SUM over second column and a count for the group - store
@@ -705,13 +689,13 @@ public class TestEvalPipeline {
         resultMap.put("pigtester", new Pair<Long, Long>(25L, 2L));
         resultMap.put("pigtester2", new Pair<Long, Long>(10L, 1L));
         resultMap.put("pigtester3", new Pair<Long, Long>(60L, 3L));
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("b = group a by name;");
-        pigServer.registerQuery("c = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+        pigServer.registerQuery("c = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("d = cogroup b by group, c by name;");
         pigServer.registerQuery("e = foreach d generate flatten(group), SUM(c.age), COUNT(c.name);");
@@ -719,27 +703,28 @@ public class TestEvalPipeline {
         for(int i = 0; i < resultMap.size(); i++) {
             Tuple t = it.next();
             Assert.assertEquals(true, resultMap.containsKey(t.get(0)));
-            Pair<Long, Long> output = resultMap.get(t.get(0)); 
+            Pair<Long, Long> output = resultMap.get(t.get(0));
             Assert.assertEquals(output.first, t.get(1));
             Assert.assertEquals(output.second, t.get(2));
         }
     }
-    
+
     @Test
     public void testUtf8Dump() throws IOException, ExecException {
-        
+
         // Create input file with unicode data
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"wendyξ"});
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext)
                 + "' using PigStorage() " + "as (name:chararray);");
         Iterator<Tuple> it = pigServer.openIterator("a");
         Tuple t = it.next();
         Assert.assertEquals("wendyξ", t.get(0));
-        
+
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testMapUDF() throws Exception{
         int LOOP_COUNT = 2;
@@ -753,7 +738,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
         String query = "C = foreach B {"
@@ -793,7 +778,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
         String query = "C = foreach B {"
@@ -811,19 +796,19 @@ public class TestEvalPipeline {
 
     @Test
     public void testLoadCtorArgs() throws IOException, ExecException {
-        
+
         // Create input file
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"hello:world"});
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext)
                 + "' using org.apache.pig.test.PigStorageNoDefCtor(':');");
         pigServer.registerQuery("b = foreach a generate (chararray)$0, (chararray)$1;");
         Iterator<Tuple> it = pigServer.openIterator("b");
         Tuple t = it.next();
         Assert.assertEquals("hello", t.get(0));
         Assert.assertEquals("world", t.get(1));
-        
+
     }
 
     @Test
@@ -839,7 +824,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -881,7 +866,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A;");
         String query = "C = foreach B {"
@@ -928,7 +913,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A;");
         String query = "C = foreach B {"
@@ -969,7 +954,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A ;"); //the argument does not matter
         pigServer.registerQuery("C = foreach B generate FLATTEN(" + Identity.class.getName() + "($0, $1));"); //the argument does not matter
@@ -990,7 +975,7 @@ public class TestEvalPipeline {
 
         Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
     }
-    
+
     @Test
     public void testCogroupAfterDistinct() throws Exception {
         String[] input1 = {
@@ -1011,13 +996,13 @@ public class TestEvalPipeline {
             };
         Util.createInputFile(cluster, "table1", input1);
         Util.createInputFile(cluster, "table2", input2);
-        
+
         pigServer.registerQuery("nonuniqtable1 = LOAD 'table1' AS (f1:chararray);");
         pigServer.registerQuery("table1 = DISTINCT nonuniqtable1;");
         pigServer.registerQuery("table2 = LOAD 'table2' AS (f1:chararray, f2:int);");
         pigServer.registerQuery("temp = COGROUP table1 BY f1 INNER, table2 BY f1;");
         Iterator<Tuple> it = pigServer.openIterator("temp");
-        
+
         // results should be:
         // (abc,{(abc)},{})
         // (def,{(def)},{})
@@ -1025,7 +1010,7 @@ public class TestEvalPipeline {
         HashMap<String, Tuple> results = new HashMap<String, Tuple>();
         Object[] row = new Object[] { "abc",
                 Util.createBagOfOneColumn(new String[] { "abc"}), mBf.newDefaultBag() };
-        results.put("abc", Util.createTuple(row)); 
+        results.put("abc", Util.createTuple(row));
         row = new Object[] { "def",
                 Util.createBagOfOneColumn(new String[] { "def"}), mBf.newDefaultBag() };
         results.put("def", Util.createTuple(row));
@@ -1044,16 +1029,16 @@ public class TestEvalPipeline {
                 Assert.assertEquals(expected.get(i++), field);
             }
         }
-        
+
         Util.deleteFile(cluster, "table1");
         Util.deleteFile(cluster, "table2");
     }
 
     @Test
     public void testAlgebraicDistinctProgress() throws Exception {
-    
+
         //creating a test input of larger than 1000 to make
-        //sure that progress kicks in. The only way to test this 
+        //sure that progress kicks in. The only way to test this
         //is to add a log statement to the getDistinct
         //method in Distinct.java. There is no automated mechanism
         //to check this from pig
@@ -1066,66 +1051,66 @@ public class TestEvalPipeline {
             inpString[i] = new Integer(i/2).toString();
             inpString[i+1] = new Integer(i/2).toString();
         }
-               
+
         Util.createInputFile(cluster, "table", inpString);
 
         pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
         pigServer.registerQuery("b = group a ALL;");
         pigServer.registerQuery("c = foreach b {aa = DISTINCT a; generate COUNT(aa);};");
         Iterator<Tuple> it = pigServer.openIterator("c");
-     
+
         Integer[] exp = new Integer[inputSize/2];
         for(int j = 0; j < inputSize/2; ++j) {
             exp[j] = j;
         }
 
         DataBag expectedBag = Util.createBagOfOneColumn(exp);
-        
+
         while(it.hasNext()) {
             Tuple tup = it.next();
             Long resultBagSize = (Long)tup.get(0);
             Assert.assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0);
         }
-        
-        Util.deleteFile(cluster, "table");        
+
+        Util.deleteFile(cluster, "table");
     }
 
     @Test
     public void testBinStorageWithLargeStrings() throws Exception {
         // Create input file with large strings
-    	int testSize = 100;
-    	String[] stringArray = new String[testSize];
-    	Random random = new Random();
-    	stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
-    	for(int i = 1; i < stringArray.length; ++i) {
-    		//generate a few large strings every 25th record
-    		if((i % 25) == 0) {
-    			stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);    			
-    		} else {
-    			stringArray[i] = GenRandomData.genRandString(random);
-    		}
-    	}
-        
-    	Util.createInputFile(cluster, "table", stringArray);
-        
-    	//test with BinStorage
+        int testSize = 100;
+        String[] stringArray = new String[testSize];
+        Random random = new Random();
+        stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
+        for(int i = 1; i < stringArray.length; ++i) {
+            //generate a few large strings every 25th record
+            if((i % 25) == 0) {
+                stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);
+            } else {
+                stringArray[i] = GenRandomData.genRandString(random);
+            }
+        }
+
+        Util.createInputFile(cluster, "table", stringArray);
+
+        //test with BinStorage
         pigServer.registerQuery("a = load 'table' using PigStorage() " +
                 "as (c: chararray);");
         String output = "/pig/out/TestEvalPipeline-testBinStorageLargeStrings";
         pigServer.deleteFile(output);
         pigServer.store("a", output, BinStorage.class.getName());
-        
+
         pigServer.registerQuery("b = load '" + output +"' using BinStorage() " +
-        "as (c:chararray);");
+                "as (c:chararray);");
         pigServer.registerQuery("c = foreach b generate c;");
-        
+
         Iterator<Tuple> it = pigServer.openIterator("c");
         int counter = 0;
         while(it.hasNext()) {
             Tuple tup = it.next();
             String resultString = (String)tup.get(0);
             String expectedString = stringArray[counter];
-          	Assert.assertTrue(expectedString.equals(resultString));
+            Assert.assertTrue(expectedString.equals(resultString));
             ++counter;
         }
         Util.deleteFile(cluster, "table");

Modified: pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java Sat Mar  1 07:22:25 2014
@@ -18,172 +18,182 @@
 
 package org.apache.pig.test;
 
+import java.io.IOException;
 import java.util.Iterator;
+import java.util.Properties;
 
-import junit.framework.Assert;
-
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestNestedForeach {
-	static MiniCluster cluster = MiniCluster.buildCluster();
+    private static PigServer pig ;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
+    @Before
+    public void setup() throws IOException {
+        pig = new PigServer(cluster.getExecType(), properties);
+    }
+
+    @BeforeClass
+    public static void oneTimeSetup() {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+
+    @Test
+    public void testNestedForeachProj() throws Exception {
+        String[] input = {
+                "1\t2",
+                "2\t7",
+                "1\t3"
+        };
+
+        Util.createInputFile(cluster, "table_nf_proj", input);
+
+        pig.registerQuery("a = load 'table_nf_proj' as (a0:int, a1:int);\n");
+        pig.registerQuery("b = group a by a0;\n");
+        pig.registerQuery("c = foreach b { c1 = foreach a generate a1; generate c1; }\n");
 
-	private PigServer pig ;
+        Iterator<Tuple> iter = pig.openIterator("c");
+        String[] expected = new String[] {"({(2),(3)})", "({(7)})"};
 
-	public TestNestedForeach() throws Throwable {
-		pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()) ;
-	}
-
-	Boolean[] nullFlags = new Boolean[]{ false, true };
-
-	@AfterClass
-	public static void oneTimeTearDown() throws Exception {
-		cluster.shutDown();
-	}
-
-	@Test
-	public void testNestedForeachProj() throws Exception {
-		String[] input = {
-				"1\t2",
-				"2\t7",
-				"1\t3"
-		};
-
-		Util.createInputFile(cluster, "table_nf_proj", input);
-
-		pig.registerQuery("a = load 'table_nf_proj' as (a0:int, a1:int);\n");
-		pig.registerQuery("b = group a by a0;\n");
-		pig.registerQuery("c = foreach b { c1 = foreach a generate a1; generate c1; }\n");
-
-		Iterator<Tuple> iter = pig.openIterator("c");
-		String[] expected = new String[] {"({(2),(3)})", "({(7)})"};
-
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
-    
-	}
-
-	@Test
-	public void testNestedForeachExpression() throws Exception {
-		String[] input = {
-				"1\t2",
-				"2\t7",
-				"1\t3"
-		};
-
-		Util.createInputFile(cluster, "table_nf_expr", input);
-
-		pig.registerQuery("a = load 'table_nf_expr' as (a0:int, a1:int);\n");
-		pig.registerQuery("b = group a by a0;\n");
-		pig.registerQuery("c = foreach b { c1 = foreach a generate 2 * a1; generate c1; }\n");
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+                org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
+
+    }
+
+    @Test
+    public void testNestedForeachExpression() throws Exception {
+        String[] input = {
+                "1\t2",
+                "2\t7",
+                "1\t3"
+        };
+
+        Util.createInputFile(cluster, "table_nf_expr", input);
+
+        pig.registerQuery("a = load 'table_nf_expr' as (a0:int, a1:int);\n");
+        pig.registerQuery("b = group a by a0;\n");
+        pig.registerQuery("c = foreach b { c1 = foreach a generate 2 * a1; generate c1; }\n");
+
+        Iterator<Tuple> iter = pig.openIterator("c");
 
-		Iterator<Tuple> iter = pig.openIterator("c");
-		
         String[] expected = new String[] {"({(4),(6)})", "({(14)})"};
 
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
-	}
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+                org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
+    }
 
-	@Test
-	public void testNestedForeachUDF() throws Exception {
-		String[] input = {
-				"1\thello",
-				"2\tpig",
-				"1\tworld"
-		};
-
-		Util.createInputFile(cluster, "table_nf_udf", input);
-
-		pig.registerQuery("a = load 'table_nf_udf' as (a0:int, a1:chararray);\n");
-		pig.registerQuery("b = group a by a0;\n");
-		pig.registerQuery("c = foreach b { c1 = foreach a generate UPPER(a1); generate c1; }\n");
+    @Test
+    public void testNestedForeachUDF() throws Exception {
+        String[] input = {
+                "1\thello",
+                "2\tpig",
+                "1\tworld"
+        };
+
+        Util.createInputFile(cluster, "table_nf_udf", input);
+
+        pig.registerQuery("a = load 'table_nf_udf' as (a0:int, a1:chararray);\n");
+        pig.registerQuery("b = group a by a0;\n");
+        pig.registerQuery("c = foreach b { c1 = foreach a generate UPPER(a1); generate c1; }\n");
+
+        Iterator<Tuple> iter = pig.openIterator("c");
 
-		Iterator<Tuple> iter = pig.openIterator("c");
-		
         String[] expected = new String[] {"({(HELLO),(WORLD)})", "({(PIG)})"};
 
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
-	}
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+                org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
+    }
 
-	@Test
-	public void testNestedForeachFlatten() throws Exception {
-		String[] input = {
-				"1\thello world pig",
-				"2\thadoop world",
-				"1\thello pig"
-		};
-
-		Util.createInputFile(cluster, "table_nf_flatten", input);
-
-		pig.registerQuery("a = load 'table_nf_flatten' as (a0:int, a1:chararray);\n");
-		pig.registerQuery("b = group a by a0;\n");
-		pig.registerQuery("c = foreach b { c1 = foreach a generate FLATTEN(TOKENIZE(a1)); generate c1; }\n");
-
-		Iterator<Tuple> iter = pig.openIterator("c");
-		
-        String[] expected = new String[] {"({(hello),(world),(pig),(hello),(pig)})", 
+    @Test
+    public void testNestedForeachFlatten() throws Exception {
+        String[] input = {
+                "1\thello world pig",
+                "2\thadoop world",
+                "1\thello pig"
+        };
+
+        Util.createInputFile(cluster, "table_nf_flatten", input);
+
+        pig.registerQuery("a = load 'table_nf_flatten' as (a0:int, a1:chararray);\n");
+        pig.registerQuery("b = group a by a0;\n");
+        pig.registerQuery("c = foreach b { c1 = foreach a generate FLATTEN(TOKENIZE(a1)); generate c1; }\n");
+
+        Iterator<Tuple> iter = pig.openIterator("c");
+
+        String[] expected = new String[] {"({(hello),(world),(pig),(hello),(pig)})",
                 "({(hadoop),(world)})"};
 
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
-	}
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+                org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
+    }
+
+    @Test
+    public void testNestedForeachInnerFilter() throws Exception {
+        String[] input = {
+                "1\t2",
+                "2\t7",
+                "1\t3"
+        };
+
+        Util.createInputFile(cluster, "table_nf_filter", input);
+
+        pig.registerQuery("a = load 'table_nf_filter' as (a0:int, a1:int);\n");
+        pig.registerQuery("b = group a by a0;\n");
+        pig.registerQuery("c = foreach b { " +
+                " c1 = filter a by a1 >= 3; " +
+                " c2 = foreach c1 generate a1; " +
+                " generate c2; " +
+        " }\n");
+
+        Iterator<Tuple> iter = pig.openIterator("c");
+        Tuple t = iter.next();
+        Assert.assertTrue(t.toString().equals("({(3)})"));
+
+        t = iter.next();
+        Assert.assertTrue(t.toString().equals("({(7)})"));
+    }
+
+    @Test
+    public void testNestedForeachInnerOrder() throws Exception {
+        String[] input = {
+                "1\t3",
+                "2\t7",
+                "1\t2"
+        };
+
+        Util.createInputFile(cluster, "table_nf_order", input);
+
+        pig.registerQuery("a = load 'table_nf_order' as (a0:int, a1:int);\n");
+        pig.registerQuery("b = group a by a0;\n");
+        pig.registerQuery("c = foreach b { " +
+                " c1 = order a by a1; " +
+                " c2 = foreach c1 generate a1; " +
+                " generate c2; " +
+        " }\n");
+
+        Iterator<Tuple> iter = pig.openIterator("c");
+        Tuple t = iter.next();
+        Assert.assertTrue(t.toString().equals("({(2),(3)})"));
+
+        t = iter.next();
+        Assert.assertTrue(t.toString().equals("({(7)})"));
+    }
 
-	@Test
-	public void testNestedForeachInnerFilter() throws Exception {
-		String[] input = {
-				"1\t2",
-				"2\t7",
-				"1\t3"
-		};
-
-		Util.createInputFile(cluster, "table_nf_filter", input);
-
-		pig.registerQuery("a = load 'table_nf_filter' as (a0:int, a1:int);\n");
-		pig.registerQuery("b = group a by a0;\n");
-		pig.registerQuery("c = foreach b { " +
-				" c1 = filter a by a1 >= 3; " +
-				" c2 = foreach c1 generate a1; " +
-				" generate c2; " +
-		" }\n");
-
-		Iterator<Tuple> iter = pig.openIterator("c");
-		Tuple t = iter.next();
-		Assert.assertTrue(t.toString().equals("({(3)})"));
-
-		t = iter.next();
-		Assert.assertTrue(t.toString().equals("({(7)})"));
-	}
-
-	@Test
-	public void testNestedForeachInnerOrder() throws Exception {
-		String[] input = {
-				"1\t3",
-				"2\t7",
-				"1\t2"
-		};
-
-		Util.createInputFile(cluster, "table_nf_order", input);
-
-		pig.registerQuery("a = load 'table_nf_order' as (a0:int, a1:int);\n");
-		pig.registerQuery("b = group a by a0;\n");
-		pig.registerQuery("c = foreach b { " +
-				" c1 = order a by a1; " +
-				" c2 = foreach c1 generate a1; " +
-				" generate c2; " +
-		" }\n");
-
-		Iterator<Tuple> iter = pig.openIterator("c");
-		Tuple t = iter.next();
-		Assert.assertTrue(t.toString().equals("({(2),(3)})"));
-
-		t = iter.next();
-		Assert.assertTrue(t.toString().equals("({(7)})"));
-	}
-	
-	// See PIG-2563
-	@Test
+    // See PIG-2563
+    @Test
     public void testNestedForeach() throws Exception {
         String[] input = {
                 "1\t2\t3",

Modified: pig/branches/tez/test/org/apache/pig/test/TestPigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigContext.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigContext.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigContext.java Sat Mar  1 07:22:25 2014
@@ -34,6 +34,8 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.util.JavaCompilerHelper;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -45,13 +47,16 @@ public class TestPigContext {
     private static final String FS_NAME = "file:///";
     private static final String JOB_TRACKER = "local";
 
+    private static PigContext pigContext;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
     private File input;
-    private PigContext pigContext;
-    static MiniCluster cluster = null;
 
     @BeforeClass
     public static void oneTimeSetup() {
-        cluster = MiniCluster.buildCluster();
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
     }
 
     @Before
@@ -60,6 +65,11 @@ public class TestPigContext {
         input = File.createTempFile("PigContextTest-", ".txt");
     }
 
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+
     /**
      * Passing an already configured pigContext in PigServer constructor.
      */
@@ -142,8 +152,7 @@ public class TestPigContext {
         int status = Util.executeJavaCommand("jar -cf " + jarFile +
                 " -C " + tmpDir.getAbsolutePath() + " " + "com");
         assertEquals(0, status);
-        Properties properties = cluster.getProperties();
-        PigContext localPigContext = new PigContext(ExecType.MAPREDUCE, properties);
+        PigContext localPigContext = new PigContext(cluster.getExecType(), properties);
 
         // register jar using properties
         localPigContext.getProperties().setProperty("pig.additional.jars", jarFile);
@@ -216,16 +225,6 @@ public class TestPigContext {
         pc.getScriptFiles().remove("test/path-1824");
     }
 
-    @After
-    public void tearDown() throws Exception {
-        input.delete();
-    }
-
-    @AfterClass
-    public static void oneTimeTearDown() throws Exception {
-        cluster.shutDown();
-    }
-
     private static Properties getProperties() {
         Properties props = new Properties();
         props.put("mapred.job.tracker", JOB_TRACKER);

Modified: pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigServer.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigServer.java Sat Mar  1 07:22:25 2014
@@ -72,9 +72,12 @@ import org.apache.pig.impl.util.Properti
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.w3c.dom.Document;
 import org.w3c.dom.Node;
@@ -83,15 +86,14 @@ import org.w3c.dom.NodeList;
 import com.google.common.io.Files;
 
 public class TestPigServer {
-    private PigServer pig = null;
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
     private File tempDir;
 
     @Before
     public void setUp() throws Exception{
         FileLocalizer.setInitialized(false);
-        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-
         tempDir = Files.createTempDir();
         tempDir.deleteOnExit();
         registerNewResource(tempDir.getAbsolutePath());
@@ -99,10 +101,15 @@ public class TestPigServer {
 
     @After
     public void tearDown() throws Exception{
-        pig = null;
         tempDir.delete();
     }
 
+    @BeforeClass
+    public static void oneTimeSetup() {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
@@ -119,9 +126,9 @@ public class TestPigServer {
             if (url.toString().contains(name)) {
                 if (!included) {
                     fail("Included is false, but url ["+url+"] contains name ["+name+"]");
-            }
+                }
                 assertEquals("Too many urls contain name: " + name, 1, ++count);
-        }
+            }
         }
         if (included) {
             assertEquals("Number of urls that contain name [" + name + "] != 1", 1, count);
@@ -150,8 +157,7 @@ public class TestPigServer {
         URL urlToAdd = new File(file).toURI().toURL();
         URLClassLoader sysLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
         Method addMethod = URLClassLoader.class.
-                            getDeclaredMethod("addURL",
-                                              new Class[]{URL.class});
+                getDeclaredMethod("addURL", new Class[]{URL.class});
         addMethod.setAccessible(true);
         addMethod.invoke(sysLoader, new Object[]{urlToAdd});
     }
@@ -166,6 +172,7 @@ public class TestPigServer {
         String jarName = "BadFileNameTestJarNotPresent.jar";
 
         // jar name is not present to start with
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         verifyStringContained(pig.getPigContext().extraJars, jarName, false);
         boolean raisedException = false;
         try {
@@ -192,9 +199,10 @@ public class TestPigServer {
 
         createFakeJarFile(jarLocation, jarName);
 
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         verifyStringContained(pig.getPigContext().extraJars, jarName, false);
 
-            pig.registerJar(jarLocation + jarName);
+        pig.registerJar(jarLocation + jarName);
         verifyStringContained(pig.getPigContext().extraJars, jarName, true);
 
         // clean-up
@@ -221,12 +229,13 @@ public class TestPigServer {
         createFakeJarFile(jarLocation1, jarName);
         createFakeJarFile(jarLocation2, jarName);
 
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         verifyStringContained(pig.getPigContext().extraJars, jarName, false);
 
         registerNewResource(jarLocation1);
         registerNewResource(jarLocation2);
 
-            pig.registerJar(jarName);
+        pig.registerJar(jarName);
         verifyStringContained(pig.getPigContext().extraJars, jarName, true);
 
         // clean-up
@@ -288,6 +297,7 @@ public class TestPigServer {
 
         // load the specific resource
         boolean exceptionRaised = false;
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         try {
             pig.registerJar("sub_dir/TestRegisterJar.class");
         }
@@ -314,7 +324,8 @@ public class TestPigServer {
         createFakeJarFile(jarLocation, jar1Name);
         createFakeJarFile(jarLocation, jar2Name);
 
-            pig.registerJar(jarLocation + "TestRegisterJarGlobbing*.jar");
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+        pig.registerJar(jarLocation + "TestRegisterJarGlobbing*.jar");
         verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
         verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
 
@@ -335,7 +346,8 @@ public class TestPigServer {
         createFakeJarFile(jarLocation, jar2Name);
 
         String currentDir = System.getProperty("user.dir");
-            pig.registerJar(new File(currentDir, dir) + FILE_SEPARATOR + "TestRegisterJarGlobbing*.jar");
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+        pig.registerJar(new File(currentDir, dir) + FILE_SEPARATOR + "TestRegisterJarGlobbing*.jar");
         verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
         verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
 
@@ -360,7 +372,8 @@ public class TestPigServer {
         // depend on configuration
         String absPath = fs.getFileStatus(new Path(jarLocation)).getPath().toString();
 
-            pig.registerJar(absPath + FILE_SEPARATOR + "TestRegister{Remote}Jar*.jar");
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+        pig.registerJar(absPath + FILE_SEPARATOR + "TestRegister{Remote}Jar*.jar");
 
         verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
         verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
@@ -370,6 +383,7 @@ public class TestPigServer {
     }
 
     public void testDescribeLoad() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         Schema dumpedSchema = pig.dumpSchema("a") ;
         Schema expectedSchema = Utils.getSchemaFromString("field1: int,field2: float,field3: chararray");
@@ -378,6 +392,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeFilter() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = filter a by field1 > 10;") ;
         Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -387,6 +402,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeDistinct() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = distinct a ;") ;
         Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -396,6 +412,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeSort() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = order a by * desc;") ;
         Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -405,6 +422,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeLimit() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = limit a 10;") ;
         Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -414,6 +432,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeForeach() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = foreach a generate field1 + 10;") ;
         Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -423,7 +442,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeForeachFail() throws Throwable {
-
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = foreach a generate field1 + 10;") ;
         try {
@@ -436,6 +455,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeForeachNoSchema() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' ;") ;
         pig.registerQuery("b = foreach a generate *;") ;
         Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -444,6 +464,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeCogroup() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
         pig.registerQuery("c = cogroup a by field1, b by field4;") ;
@@ -454,6 +475,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeCross() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
         pig.registerQuery("c = cross a, b;") ;
@@ -464,6 +486,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeJoin() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
         pig.registerQuery("c = join a by field1, b by field4;") ;
@@ -474,6 +497,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeUnion() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
         pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
         pig.registerQuery("c = union a, b;") ;
@@ -484,6 +508,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeTuple2Elem() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (field1: int, field2: int, field3: int );") ;
         pig.registerQuery("b = foreach a generate field1, (field2, field3);") ;
         Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -493,6 +518,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeComplex() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (site: chararray, count: int, itemCounts: bag { itemCountsTuple: tuple (type: chararray, typeCount: int, f: float, m: map[]) } ) ;") ;
         pig.registerQuery("b = foreach a generate site, count, FLATTEN(itemCounts);") ;
         Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -504,6 +530,7 @@ public class TestPigServer {
     }
 
     private void registerScalarScript(boolean useScalar, String expectedSchemaStr) throws IOException {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("A = load 'adata' AS (a: int, b: int);");
         //scalar
         pig.registerQuery("C = FOREACH A GENERATE *;");
@@ -533,6 +560,12 @@ public class TestPigServer {
 
     @Test
     public void testExplainXmlComplex() throws Throwable {
+        // TODO: Explain XML output is not supported in non-MR mode. Remove the
+        // following condition once it's implemented in Tez.
+        if (cluster.getExecType() != ExecType.MAPREDUCE) {
+            return;
+        }
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (site: chararray, count: int, itemCounts: bag { itemCountsTuple: tuple (type: chararray, typeCount: int, f: float, m: map[]) } ) ;") ;
         pig.registerQuery("b = foreach a generate site, count, FLATTEN(itemCounts);") ;
         pig.registerQuery("c = group b by site;");
@@ -614,6 +647,7 @@ public class TestPigServer {
         String absPath = fs.getFileStatus(new Path(scriptName)).getPath().toString();
 
         Util.createInputFile(cluster, "testRegisterRemoteScript_input", new String[]{"1", "2"});
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerCode(absPath, "jython", "pig");
         pig.registerQuery("a = load 'testRegisterRemoteScript_input';");
         pig.registerQuery("b = foreach a generate pig.helloworld($0);");
@@ -631,26 +665,6 @@ public class TestPigServer {
 
         assertFalse(iter.hasNext());
     }
-
-    // PIG-3469
-    @Test
-    public void testNonExistingSecondDirectoryInSkewJoin() throws Exception {
-        String script =
-          "exists = LOAD 'test/org/apache/pig/test/data/InputFiles/jsTst1.txt' AS (x:chararray, a:long);" +
-          "missing = LOAD '/non/existing/directory' AS (a:long);" +
-          "missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" +
-          "joined = JOIN exists BY a, missing BY a USING 'skewed';" +
-          "STORE joined INTO '/tmp/test_out.tsv';";
-
-        PigServer pig = new PigServer(ExecType.LOCAL);
-        // Execution of the script should fail, but without throwing any exceptions (such as NPE)
-        try {
-            pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")));
-        } catch(Exception ex) {
-            fail("Unexpected exception: " + ex);
-        }
-    }
-
     @Test
     public void testParamSubstitution() throws Exception{
         // using params map
@@ -798,6 +812,7 @@ public class TestPigServer {
 
     @Test
     public void testDescribeForEachFlatten() throws Throwable {
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a';") ;
         pig.registerQuery("b = group a by $0;") ;
         pig.registerQuery("c = foreach b generate flatten(a);") ;
@@ -807,7 +822,8 @@ public class TestPigServer {
 
     @Test // PIG-2059
     public void test1() throws Throwable {
-    	pig.setValidateEachStatement(true);
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+        pig.setValidateEachStatement(true);
         pig.registerQuery("A = load 'x' as (u, v);") ;
         try {
             pig.registerQuery("B = foreach A generate $2;") ;
@@ -820,40 +836,40 @@ public class TestPigServer {
     }
 
     @Test
-	public void testDefaultPigProperties() throws Throwable {
-    	//Test with PigServer
-    	PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
-    	Properties properties = pigServer.getPigContext().getProperties();
+    public void testDefaultPigProperties() throws Throwable {
+        //Test with PigServer
+        PigServer pigServer = new PigServer(cluster.getExecType());
+        Properties properties = pigServer.getPigContext().getProperties();
 
         assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
         assertEquals("true", properties.getProperty("aggregate.warning"));
         assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
         assertEquals("false", properties.getProperty("stop.on.failure"));
 
-		//Test with properties file
-		File propertyFile = new File(tempDir, "pig.properties");
+        //Test with properties file
+        File propertyFile = new File(tempDir, "pig.properties");
 
-		properties = PropertiesUtil.loadDefaultProperties();
+        properties = PropertiesUtil.loadDefaultProperties();
 
-		assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
+        assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
         assertEquals("true", properties.getProperty("aggregate.warning"));
         assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
         assertEquals("false", properties.getProperty("stop.on.failure"));
 
-		PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
-		out.println("aggregate.warning=false");
-		out.println("opt.multiquery=false");
-		out.println("stop.on.failure=true");
-
-		out.close();
-
-		properties = PropertiesUtil.loadDefaultProperties();
-		assertEquals("false", properties.getProperty("aggregate.warning"));
-		assertEquals("false", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
-		assertEquals("true", properties.getProperty("stop.on.failure"));
+        PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
+        out.println("aggregate.warning=false");
+        out.println("opt.multiquery=false");
+        out.println("stop.on.failure=true");
+
+        out.close();
 
-		propertyFile.delete();
-	}
+        properties = PropertiesUtil.loadDefaultProperties();
+        assertEquals("false", properties.getProperty("aggregate.warning"));
+        assertEquals("false", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
+        assertEquals("true", properties.getProperty("stop.on.failure"));
+
+        propertyFile.delete();
+    }
 
     @Test
     public void testSecondarySort() throws Exception {

Modified: pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java Sat Mar  1 07:22:25 2014
@@ -18,7 +18,6 @@
 
 package org.apache.pig.test;
 
-import static org.apache.pig.ExecType.MAPREDUCE;
 import static org.apache.pig.builtin.mock.Storage.tuple;
 import static org.junit.Assert.*;
 
@@ -26,16 +25,13 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
@@ -52,6 +48,8 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.test.utils.TypeCheckingTestUtil;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -59,18 +57,15 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestPigStorage  {
-
-    protected final Log log = LogFactory.getLog(getClass());
-
-    private static MiniCluster cluster = MiniCluster.buildCluster();
-    static PigServer pig;
-    static final String datadir = "build/test/tmpdata/";
-
-    PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
-    Map<String, String> fileNameMap = new HashMap<String, String>();
+    private static PigServer pig;
+    private static PigContext pigContext;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+    private static final String datadir = "build/test/tmpdata/";
 
     @Before
     public void setup() throws IOException {
@@ -99,6 +94,13 @@ public class TestPigStorage  {
         pig.shutdown();
     }
 
+    @BeforeClass
+    public static void oneTimeSetup() {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+        pigContext = new PigContext(ExecType.LOCAL, new Properties());
+    }
+
     @AfterClass
     public static void shutdown() {
         cluster.shutDown();
@@ -120,11 +122,11 @@ public class TestPigStorage  {
         // This tests PigStorage loader with records exactly
         // on the boundary of the file blocks.
         Properties props = new Properties();
-        for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+        for (Entry<Object, Object> entry : properties.entrySet()) {
             props.put(entry.getKey(), entry.getValue());
         }
         props.setProperty("mapred.max.split.size", "20");
-        PigServer pigServer = new PigServer(MAPREDUCE, props);
+        PigServer pigServer = new PigServer(cluster.getExecType(), props);
         String[] inputs = {
                 "abcdefgh1", "abcdefgh2", "abcdefgh3",
                 "abcdefgh4", "abcdefgh5", "abcdefgh6",
@@ -191,7 +193,7 @@ public class TestPigStorage  {
                 inputFileName,
                 new String[] {"1\t2\t3", "4", "5\t6\t7"});
         String script = "a = load '" + inputFileName + "' as (i:int, j:int, k:int);" +
-        		"b = foreach a generate j, k;";
+                "b = foreach a generate j, k;";
         Util.registerMultiLineQuery(pig, script);
         Iterator<Tuple> it = pig.openIterator("b");
         assertEquals(Util.createTuple(new Integer[] { 2, 3}), it.next());
@@ -261,9 +263,9 @@ public class TestPigStorage  {
         "as (f1:chararray, f2:int);";
         pig.registerQuery(query);
         pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
-    
+
         // aout now has a schema.
-    
+
         // Verify that loaded data has the correct data type after the prune
         pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t'); c = FOREACH b GENERATE f2;");
         
@@ -521,40 +523,41 @@ public class TestPigStorage  {
             pig.mkdirs(globtestdir+"b");
         } catch (IOException e) {};
 
+        Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
         // if schema file is not found, schema is null
-        ResourceSchema schema = pigStorage.getSchema(globtestdir, new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        ResourceSchema schema = pigStorage.getSchema(globtestdir, new Job(conf));
         Assert.assertTrue(schema==null);
 
         // if .pig_schema is in the input directory
         putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
-        schema = pigStorage.getSchema(globtestdir+"a/a0", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        schema = pigStorage.getSchema(globtestdir+"a/a0", new Job(conf));
         Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
         new File(globtestdir+"a/a0/.pig_schema").delete();
 
         // .pig_schema in one of globStatus returned directory
         putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
-        schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
         Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
         new File(globtestdir+"a/.pig_schema").delete();
 
         putSchemaFile(globtestdir+"b/.pig_schema", testSchema);
-        schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
         Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
         new File(globtestdir+"b/.pig_schema").delete();
 
         // if .pig_schema is deep in the globbing, it will not get used
         putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
-        schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
         Assert.assertTrue(schema==null);
         putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
-        schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
         Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
         new File(globtestdir+"a/a0/.pig_schema").delete();
         new File(globtestdir+"a/.pig_schema").delete();
 
         pigStorage = new PigStorage("\t", "-schema");
         putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
-        schema = pigStorage.getSchema(globtestdir+"{a,b}", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        schema = pigStorage.getSchema(globtestdir+"{a,b}", new Job(conf));
         Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
     }
 

Modified: pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java Sat Mar  1 07:22:25 2014
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -35,6 +37,7 @@ import java.util.Properties;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.BagFactory;
@@ -44,6 +47,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -95,7 +99,7 @@ public class TestSkewedJoin {
 
         int k = 0;
         for(int j=0; j<120; j++) {
-               w.println("100\tapple1\taaa" + k);
+            w.println("100\tapple1\taaa" + k);
             k++;
             w.println("200\torange1\tbbb" + k);
             k++;
@@ -532,4 +536,61 @@ public class TestSkewedJoin {
             }
         }
     }
+
+    // PIG-3469
+    // This query should fail with nothing else but InvalidInputException
+    @Test
+    public void testNonExistingInputPathInSkewJoin() throws Exception {
+        String script =
+          "exists = LOAD '" + INPUT_FILE2 + "' AS (a:long, x:chararray);" +
+          "missing = LOAD '/non/existing/directory' AS (a:long);" +
+          "missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" +
+          "joined = JOIN exists BY a, missing BY a USING 'skewed';";
+
+        String logFile = Util.createTempFileDelOnExit("tmp", ".log").getAbsolutePath();
+        String oldValue = (String) properties.setProperty("pig.logfile", logFile);
+
+        try {
+            pigServer.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")));
+            pigServer.openIterator("joined");
+        } catch (Exception e) {
+            boolean foundInvalidInputException = false;
+
+            // Search through chained exceptions for InvalidInputException. If
+            // input splits are calculated on the front-end, we will see this
+            // exception in the stack trace.
+            Throwable cause = e.getCause();
+            while (cause != null) {
+                if (cause instanceof InvalidInputException) {
+                    foundInvalidInputException = true;
+                    break;
+                }
+                cause = cause.getCause();
+            }
+
+            // InvalidInputException was not found in the stack trace. But it's
+            // possible that the exception was thrown in the back-end, and Pig
+            // couldn't retrieve it in the front-end. To be safe, search the log
+            // file before declaring a failure.
+            if (!foundInvalidInputException) {
+                FileInputStream fis = new FileInputStream(new File(logFile));
+                int bytes = fis.available();
+                byte[] buffer = new byte[bytes];
+                fis.read(buffer);
+                String str = new String(buffer, "UTF-8");
+                if (str.contains(InvalidInputException.class.getName())) {
+                    foundInvalidInputException = true;
+                }
+                fis.close();
+            }
+
+            assertTrue("This exception was not caused by InvalidInputException: " + e,
+                    foundInvalidInputException);
+        } finally {
+            if (oldValue != null) {
+                properties.setProperty("pig.logfile", oldValue);
+            }
+        }
+    }
+
 }

Modified: pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java Sat Mar  1 07:22:25 2014
@@ -23,7 +23,6 @@ import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.Properties;
 
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;

Modified: pig/branches/tez/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/tez-tests?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/tez-tests (original)
+++ pig/branches/tez/test/tez-tests Sat Mar  1 07:22:25 2014
@@ -6,5 +6,10 @@
 **/TestSkewedJoin.java
 **/TestSplitStore.java
 **/TestCustomPartitioner.java
+**/TestPigContext.java
+**/TestPigStorage.java
+**/TestNestedForeach.java
+**/TestEvalPipeline.java
+**/TestPigServer.java
 ## TODO: Runs fine individually. Hangs with file.out.index not found when run together. Likely Tez Bug
 ##**/TestSecondarySortTez.java