You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC

svn commit: r1598702 [17/23] - in /pig/trunk: ./ ivy/ shims/src/hadoop23/org/apache/pig/backend/hadoop23/ shims/test/hadoop20/org/apache/pig/test/ shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/ src/org/apache/pig/ src/org/apache/pig/ba...

Added: pig/trunk/test/org/apache/pig/test/TestCustomPartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCustomPartitioner.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCustomPartitioner.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestCustomPartitioner.java Fri May 30 19:07:23 2014
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCustomPartitioner {
+    private static MiniGenericCluster cluster;
+    private static Properties properties;
+    private static PigServer pigServer;
+    private static FileSystem fs;
+
+    TupleFactory mTf = TupleFactory.getInstance();
+    BagFactory mBf = BagFactory.getInstance();
+
+    @Before
+    public void setUp() throws Exception {
+        FileLocalizer.setR(new Random());
+        pigServer = new PigServer(cluster.getExecType(), properties);
+    }
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+        fs = cluster.getFileSystem();
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerParseJoins() throws Exception{
+        String[] input = {
+                "1\t3",
+                "1\t2"
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
+
+        // Custom Partitioner is not allowed for skewed joins, will throw a ExecException
+        try {
+            pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
+            pigServer.registerQuery("B = ORDER A by $0;");
+            pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+            //control should not reach here
+            Assert.fail("Skewed join cannot accept a custom partitioner");
+        } catch(FrontendException e) {
+            Assert.assertTrue( e.getMessage().contains( "Custom Partitioner is not supported for skewed join" ) );
+        }
+
+        pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        Iterator<Tuple> iter = pigServer.openIterator("hash");
+
+        List<String> expected = new ArrayList<String>();
+        expected.add("(1,3,1,2)");
+        expected.add("(1,3,1,3)");
+        expected.add("(1,2,1,2)");
+        expected.add("(1,2,1,3)");
+        Collections.sort(expected);
+
+        List<String> actual = new ArrayList<String>();
+        while (iter.hasNext()) {
+            actual.add(iter.next().toString());
+        }
+        Collections.sort(actual);
+
+        Assert.assertEquals(expected, actual);
+
+        // No checks are made for merged and replicated joins as they are compiled to a map only job
+        // No frontend error checking has been added for these jobs, hence not adding any test cases
+        // Manually tested the sanity once. Above test should cover the basic sanity of the scenario
+
+        Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
+    }
+
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerGroups() throws Exception{
+        String[] input = {
+                "1\t1",
+                "2\t1",
+                "3\t1",
+                "4\t1"
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
+
+        String outputDir = "tmp_testCustomPartitionerGroup";
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
+        // It should be noted that for a map reduce job, the total number of partitions
+        // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
+        // we will get more than one reduce job so that we can use the partitioner.
+        // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+        // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
+        // partition number is bigger than 1.
+        //
+        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+        pigServer.store("B", outputDir);
+
+        new File(outputDir).mkdir();
+        FileStatus[] outputFiles = fs.listStatus(new Path(outputDir), Util.getSuccessMarkerPathFilter());
+
+        Util.copyFromClusterToLocal(cluster, outputFiles[0].getPath().toString(), outputDir + "/" + 0);
+        BufferedReader reader = new BufferedReader(new FileReader(outputDir + "/" + 0));
+        while(reader.readLine() != null) {
+            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
+        }
+        reader.close();
+
+        Util.copyFromClusterToLocal(cluster, outputFiles[1].getPath().toString(), outputDir + "/" + 1);
+        reader = new BufferedReader(new FileReader(outputDir + "/" + 1));
+        int count=0;
+        while(reader.readLine() != null) {
+            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+            count++;
+        }
+        reader.close();
+        Assert.assertEquals(4, count);
+
+        Util.deleteDirectory(new File(outputDir));
+        Util.deleteFile(cluster, outputDir);
+        Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
+    }
+
+    // See PIG-3385
+    @Test
+    public void testCustomPartitionerDistinct() throws Exception{
+        String[] input = {
+                "1\t1",
+                "2\t1",
+                "1\t1",
+                "3\t1",
+                "4\t1",
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
+
+        String outputDir = "tmp_testCustomPartitionerDistinct";
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
+        pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+        pigServer.store("B", outputDir);
+
+        new File(outputDir).mkdir();
+        FileStatus[] outputFiles = fs.listStatus(new Path(outputDir), Util.getSuccessMarkerPathFilter());
+
+        // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
+        Util.copyFromClusterToLocal(cluster, outputFiles[0].getPath().toString(), outputDir + "/" + 0);
+        BufferedReader reader = new BufferedReader(new FileReader(outputDir + "/" + 0));
+        while (reader.readLine() != null) {
+            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
+        }
+        reader.close();
+
+        Util.copyFromClusterToLocal(cluster, outputFiles[1].getPath().toString(), outputDir + "/" + 1);
+        reader = new BufferedReader(new FileReader(outputDir + "/" + 1));
+        int count=0;
+        while (reader.readLine() != null) {
+            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+            count++;
+        }
+        reader.close();
+        Assert.assertEquals(4, count);
+
+        Util.deleteDirectory(new File(outputDir));
+        Util.deleteFile(cluster, outputDir);
+        Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
+    }
+
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerCross() throws Exception{
+    	String[] input = {
+                "1\t3",
+                "1\t2",
+        };
+
+        Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
+        pigServer.registerQuery("B = ORDER A by $0;");
+        pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        Tuple t;
+
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3,1,2)");
+        results.add("(1,3,1,3)");
+        results.add("(1,2,1,2)");
+        results.add("(1,2,1,3)");
+
+        Assert.assertTrue(iter.hasNext());
+        t = iter.next();
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(results.contains(t.toString()));
+
+        Assert.assertTrue(iter.hasNext());
+        t = iter.next();
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(results.contains(t.toString()));
+
+        Assert.assertTrue(iter.hasNext());
+        t = iter.next();
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(results.contains(t.toString()));
+
+        Assert.assertTrue(iter.hasNext());
+        t = iter.next();
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(results.contains(t.toString()));
+
+        Util.deleteFile(cluster, "table_testCustomPartitionerCross");
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Fri May 30 19:07:23 2014
@@ -28,14 +28,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 import java.util.StringTokenizer;
 
-import junit.framework.Assert;
-
-import org.apache.pig.ComparisonFunc;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -56,32 +53,37 @@ import org.apache.pig.impl.util.Pair;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.Identity;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
 
-@RunWith(JUnit4.class)
 public class TestEvalPipeline {
-    
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    private PigServer pigServer;
-    private PigContext pigContext;
-
-    TupleFactory mTf = TupleFactory.getInstance();
-    BagFactory mBf = BagFactory.getInstance();
-    
+    private static PigServer pigServer;
+    private static PigContext pigContext;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
+    private TupleFactory mTf = TupleFactory.getInstance();
+    private BagFactory mBf = BagFactory.getInstance();
+
     @Before
     public void setUp() throws Exception{
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), properties);
         pigContext = pigServer.getPigContext();
     }
-    
+
+    @BeforeClass
+    public static void oneTimeSetup() {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
     static public class MyBagFunction extends EvalFunc<DataBag>{
         @Override
         public DataBag exec(Tuple input) throws IOException {
@@ -93,35 +95,35 @@ public class TestEvalPipeline {
             return output;
         }
     }
-    
+
     @Test
     public void testFunctionInsideFunction() throws Exception{
         File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
 
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(f1.toString(), pigContext) 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(f1.toString(), pigContext)
                 + "' using " + PigStorage.class.getName() + "(':');");
         pigServer.registerQuery("b = foreach a generate 1-1/1;");
         Iterator<Tuple> iter  = pigServer.openIterator("b");
-        
+
         for (int i=0 ;i<3; i++){
-            Assert.assertEquals(DataType.toDouble(iter.next().get(0)), 0.0);
+            Assert.assertEquals(0.0d, DataType.toDouble(iter.next().get(0)).doubleValue(), 0.0d);
         }
     }
-    
+
     @Test
     public void testJoin() throws Exception{
         File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
         File f2 = Util.createFile(new String[]{"b","b","a"});
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(f1.toString(), pigContext) + "' using " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(f1.toString(), pigContext) + "' using "
                 + PigStorage.class.getName() + "(':');");
-        pigServer.registerQuery("b = load '" 
+        pigServer.registerQuery("b = load '"
                 + Util.generateURI(f2.toString(), pigContext) + "';");
-        pigServer.registerQuery("c = cogroup a by $0, b by $0;");        
+        pigServer.registerQuery("c = cogroup a by $0, b by $0;");
         pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
-        
+
         Iterator<Tuple> iter = pigServer.openIterator("d");
         int count = 0;
         while(iter.hasNext()){
@@ -131,7 +133,7 @@ public class TestEvalPipeline {
         }
         Assert.assertEquals(count, 4);
     }
-    
+
     @Test
     public void testDriverMethod() throws Exception{
         File f = Util.createTempFileDelOnExit("tmp", "");
@@ -139,8 +141,8 @@ public class TestEvalPipeline {
         pw.println("a");
         pw.println("a");
         pw.close();
-        pigServer.registerQuery("a = foreach (load '" 
-                + Util.generateURI(f.toString(), pigContext) + "') " 
+        pigServer.registerQuery("a = foreach (load '"
+                + Util.generateURI(f.toString(), pigContext) + "') "
                 + "generate 1, flatten(" + MyBagFunction.class.getName() + "(*));");
         Iterator<Tuple> iter = pigServer.openIterator("a");
         int count = 0;
@@ -153,61 +155,61 @@ public class TestEvalPipeline {
         Assert.assertEquals(count, 6);
         f.delete();
     }
-    
+
     @Test
     public void testMapLookup() throws Exception {
         DataBag b = BagFactory.getInstance().newDefaultBag();
         Map<String, Object> colors = new HashMap<String, Object>();
         colors.put("apple","red");
         colors.put("orange","orange");
-        
+
         Map<String, Object> weights = new HashMap<String, Object>();
         weights.put("apple","0.1");
         weights.put("orange","0.3");
-        
+
         Tuple t = mTf.newTuple();
         t.append(colors);
         t.append(weights);
         b.add(t);
-        
+
         File tmpFile = File.createTempFile("tmp", "");
         tmpFile.delete(); // we only needed the temp file name, so delete the file
         String fileName = Util.removeColon(tmpFile.getAbsolutePath());
 
         PigFile f = new PigFile(fileName);
         f.store(b, new FuncSpec(BinStorage.class.getCanonicalName()),
-                pigServer.getPigContext());        
-        
+                pigServer.getPigContext());
+
         pigServer.registerQuery("a = load '" + Util.encodeEscape(fileName) + "' using BinStorage();");
         pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
         Iterator<Tuple> iter = pigServer.openIterator("b");
         t = iter.next();
         Assert.assertEquals(t.get(0).toString(), "red");
-        Assert.assertEquals(DataType.toDouble(t.get(1)), 0.3);
+        Assert.assertEquals(0.3d, DataType.toDouble(t.get(1)).doubleValue(), 0.0d);
         Assert.assertFalse(iter.hasNext());
         Util.deleteFile(cluster, fileName);
     }
-    
+
     static public class TitleNGrams extends EvalFunc<DataBag> {
-        
+
         @Override
-        public DataBag exec(Tuple input) throws IOException {    
+        public DataBag exec(Tuple input) throws IOException {
             try {
                 DataBag output = BagFactory.getInstance().newDefaultBag();
                 String str = input.get(0).toString();
-            
+
                 String title = str;
 
                 if (title != null) {
                     List<String> nGrams = makeNGrams(title);
-                    
+
                     for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
                         Tuple t = TupleFactory.getInstance().newTuple(1);
                         t.set(0, it.next());
                         output.add(t);
                     }
                 }
-    
+
                 return output;
             } catch (ExecException ee) {
                 IOException ioe = new IOException(ee.getMessage());
@@ -215,28 +217,28 @@ public class TestEvalPipeline {
                 throw ioe;
             }
         }
-        
-        
+
+
         List<String> makeNGrams(String str) {
             List<String> tokens = new ArrayList<String>();
-            
+
             StringTokenizer st = new StringTokenizer(str);
             while (st.hasMoreTokens())
                 tokens.add(st.nextToken());
-            
+
             return nGramHelper(tokens, new ArrayList<String>());
         }
-        
+
         ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
             if (str.size() == 0)
                 return nGrams;
-            
+
             for (int i = 0; i < str.size(); i++)
                 nGrams.add(makeString(str.subList(0, i+1)));
-            
+
             return nGramHelper(str.subList(1, str.size()), nGrams);
         }
-        
+
         String makeString(List<String> list) {
             StringBuffer sb = new StringBuffer();
             for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
@@ -288,18 +290,18 @@ public class TestEvalPipeline {
             myMap.put("map", mapInMap);
             myMap.put("tuple", tuple);
             myMap.put("bag", bag);
-            return myMap; 
+            return myMap;
         }
 
         public Schema outputSchema(Schema input) {
             return new Schema(new Schema.FieldSchema(null, DataType.MAP));
         }
     }
-    
+
     @Test
     public void testBagFunctionWithFlattening() throws Exception{
         File queryLogFile = Util.createFile(
-                    new String[]{ 
+                    new String[]{
                         "stanford\tdeer\tsighting",
                         "bush\tpresident",
                         "stanford\tbush",
@@ -309,66 +311,52 @@ public class TestEvalPipeline {
                         "stanford\tpresident",
                     }
                 );
-                
+
         File newsFile = Util.createFile(
                     new String[]{
                         "deer seen at stanford",
-                        "george bush visits stanford", 
-                        "yahoo hosting a conference in the bay area", 
+                        "george bush visits stanford",
+                        "yahoo hosting a conference in the bay area",
                         "who will win the world cup"
                     }
-                );    
-        
+                );
+
         Map<String, Integer> expectedResults = new HashMap<String, Integer>();
         expectedResults.put("bush", 2);
         expectedResults.put("stanford", 3);
         expectedResults.put("world", 1);
         expectedResults.put("conference", 1);
-        
-        pigServer.registerQuery("newsArticles = LOAD '" 
-                + Util.generateURI(newsFile.toString(), pigContext) 
+
+        pigServer.registerQuery("newsArticles = LOAD '"
+                + Util.generateURI(newsFile.toString(), pigContext)
                 + "' USING " + TextLoader.class.getName() + "();");
-        pigServer.registerQuery("queryLog = LOAD '" 
+        pigServer.registerQuery("queryLog = LOAD '"
                 + Util.generateURI(queryLogFile.toString(), pigContext) + "';");
 
         pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
         pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
         pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
-        
+
         Iterator<Tuple> iter = pigServer.openIterator("answer");
         if(!iter.hasNext()) Assert.fail("No Output received");
         while(iter.hasNext()){
             Tuple t = iter.next();
-            Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),(DataType.toDouble(t.get(0))).doubleValue());
+            Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),
+                    (DataType.toDouble(t.get(0))).doubleValue(), 0.0d);
         }
     }
-    
-    /*    
-    @Test
-    public void testSort() throws Exception{
-        testSortDistinct(false, false);
-    }    
-    */    
 
     @Test
-    public void testSortWithUDF() throws Exception{
-        testSortDistinct(false, true);
-    }    
+    public void testSort() throws Exception{
+        testSortDistinct(false);
+    }
 
     @Test
     public void testDistinct() throws Exception{
-        testSortDistinct(true, false);
+        testSortDistinct(true);
     }
-    
-    public static class TupComp extends ComparisonFunc {
 
-        @Override
-        public int compare(Tuple t1, Tuple t2) {
-            return t1.compareTo(t2);
-        }
-    }
-
-    private void testSortDistinct(boolean eliminateDuplicates, boolean useUDF) throws Exception{
+    private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
         int LOOP_SIZE = 1024*16;
         File tmpFile = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -376,18 +364,14 @@ public class TestEvalPipeline {
         for(int i = 0; i < LOOP_SIZE; i++) {
             ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
         }
-        ps.close(); 
-        
-        pigServer.registerQuery("A = LOAD '" 
+        ps.close();
+
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         if (eliminateDuplicates){
             pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
         }else{
-            if(!useUDF) {
-                pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
-            } else {
-                pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";");
-            }
+            pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
         }
         Iterator<Tuple> iter = pigServer.openIterator("B");
         String last = "";
@@ -404,9 +388,9 @@ public class TestEvalPipeline {
                 Assert.assertEquals(t.size(), 2);
                 last = t.get(0).toString();
             }
-        }        
+        }
     }
-    
+
     @Test
     public void testNestedPlan() throws Exception{
         int LOOP_COUNT = 10;
@@ -420,7 +404,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -460,7 +444,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -502,7 +486,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = limit A 5;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -514,15 +498,15 @@ public class TestEvalPipeline {
         }
         Assert.assertEquals(5, numIdentity);
     }
-    
+
     @Test
     public void testComplexData() throws IOException, ExecException {
         // Create input file with ascii data
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
         pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
         Iterator<Tuple> it = pigServer.openIterator("b");
@@ -532,10 +516,10 @@ public class TestEvalPipeline {
         Assert.assertEquals("2", t.get(2).toString());
         Assert.assertEquals("value1", t.get(3).toString());
         Assert.assertEquals("value2", t.get(4).toString());
-        
+
         //test with BinStorage
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
         String output = "/pig/out/TestEvalPipeline-testComplexData";
         pigServer.deleteFile(output);
@@ -549,17 +533,17 @@ public class TestEvalPipeline {
         Assert.assertEquals("1", t.get(1).toString());
         Assert.assertEquals("2", t.get(2).toString());
         Assert.assertEquals("value1", t.get(3).toString());
-        Assert.assertEquals("value2", t.get(4).toString());        
+        Assert.assertEquals("value2", t.get(4).toString());
     }
 
     @Test
     public void testBinStorageDetermineSchema() throws IOException, ExecException {
         // Create input file with ascii data
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
         pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
         Iterator<Tuple> it = pigServer.openIterator("b");
@@ -569,10 +553,10 @@ public class TestEvalPipeline {
         Assert.assertEquals(2, t.get(2));
         Assert.assertEquals("value1", t.get(3).toString());
         Assert.assertEquals("value2", t.get(4).toString());
-        
+
         //test with BinStorage
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
         String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema";
         pigServer.deleteFile(output);
@@ -587,7 +571,7 @@ public class TestEvalPipeline {
         String[] generates = {"q = foreach p generate COUNT(b), t2.a, t2.b as t2b, m#'key1', m#'key2', b;",
                 "q = foreach p generate COUNT(b), t2.$0, t2.$1, m#'key1', m#'key2', b;",
                 "q = foreach p generate COUNT($0), $1.$0, $1.$1, $2#'key1', $2#'key2', $0;"};
-        
+
         for (int i = 0; i < loads.length; i++) {
             pigServer.registerQuery(loads[i]);
             pigServer.registerQuery(generates[i]);
@@ -605,18 +589,18 @@ public class TestEvalPipeline {
             for (Iterator<Tuple> bit = bg.iterator(); bit.hasNext();) {
                 Tuple bt = bit.next();
                 Assert.assertEquals(String.class, bt.get(0).getClass());
-                Assert.assertEquals(String.class, bt.get(1).getClass());            
+                Assert.assertEquals(String.class, bt.get(1).getClass());
             }
-        }        
+        }
     }
 
     @Test
     public void testProjectBag() throws IOException, ExecException {
         // This tests make sure that when a bag with multiple columns is
         // projected all columns apear in the output
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"f1\tf2\tf3"});
-        pigServer.registerQuery("a = load '" 
+        pigServer.registerQuery("a = load '"
                 + Util.generateURI(input.toString(), pigContext) + "' as (x, y, z);");
         pigServer.registerQuery("b = group a by x;");
         pigServer.registerQuery("c = foreach b generate flatten(a.(y, z));");
@@ -630,11 +614,11 @@ public class TestEvalPipeline {
     @Test
     public void testBinStorageDetermineSchema2() throws IOException, ExecException {
         // Create input file with ascii data
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"pigtester\t10\t1.2"});
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (name:chararray, age:int, gpa:double);");
         String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema2";
         pigServer.deleteFile(output);
@@ -649,7 +633,7 @@ public class TestEvalPipeline {
         String[] generates = {"q = foreach p generate name, age, gpa;",
                 "q = foreach p generate name, age, gpa;",
                 "q = foreach p generate $0, $1, $2;"};
-        
+
         for (int i = 0; i < loads.length; i++) {
             pigServer.registerQuery(loads[i]);
             pigServer.registerQuery(generates[i]);
@@ -662,7 +646,7 @@ public class TestEvalPipeline {
             Assert.assertEquals(1.2, t.get(2));
             Assert.assertEquals(Double.class, t.get(2).getClass());
         }
-        
+
         // test that valid casting is allowed
         pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
                 " as (name, age:long, gpa:float);");
@@ -675,7 +659,7 @@ public class TestEvalPipeline {
         Assert.assertEquals(Long.class, t.get(1).getClass());
         Assert.assertEquals(1.2f, t.get(2));
         Assert.assertEquals(Float.class, t.get(2).getClass());
-        
+
         // test that implicit casts work
         pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
         " as (name, age, gpa);");
@@ -689,15 +673,15 @@ public class TestEvalPipeline {
         Assert.assertEquals(1, t.get(2));
         Assert.assertEquals(Integer.class, t.get(2).getClass());
     }
-    
+
     @Test
     public void testCogroupWithInputFromGroup() throws IOException, ExecException {
         // Create input file with ascii data
-        File input = Util.createInputFile("tmp", "", 
-                new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2", 
+        File input = Util.createInputFile("tmp", "",
+                new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2",
                 "pigtester2\t10\t1.2",
                 "pigtester3\t10\t1.2", "pigtester3\t20\t1.2", "pigtester3\t30\t1.2"});
-        
+
         Map<String, Pair<Long, Long>> resultMap = new HashMap<String, Pair<Long, Long>>();
         // we will in essence be doing a group on first column and getting
         // SUM over second column and a count for the group - store
@@ -705,13 +689,13 @@ public class TestEvalPipeline {
         resultMap.put("pigtester", new Pair<Long, Long>(25L, 2L));
         resultMap.put("pigtester2", new Pair<Long, Long>(10L, 1L));
         resultMap.put("pigtester3", new Pair<Long, Long>(60L, 3L));
-        
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("b = group a by name;");
-        pigServer.registerQuery("c = load '" 
-                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() " 
+        pigServer.registerQuery("c = load '"
+                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                 + "as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("d = cogroup b by group, c by name;");
         pigServer.registerQuery("e = foreach d generate flatten(group), SUM(c.age), COUNT(c.name);");
@@ -719,27 +703,28 @@ public class TestEvalPipeline {
         for(int i = 0; i < resultMap.size(); i++) {
             Tuple t = it.next();
             Assert.assertEquals(true, resultMap.containsKey(t.get(0)));
-            Pair<Long, Long> output = resultMap.get(t.get(0)); 
+            Pair<Long, Long> output = resultMap.get(t.get(0));
             Assert.assertEquals(output.first, t.get(1));
             Assert.assertEquals(output.second, t.get(2));
         }
     }
-    
+
     @Test
     public void testUtf8Dump() throws IOException, ExecException {
-        
+
         // Create input file with unicode data
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"wendyξ"});
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext)
                 + "' using PigStorage() " + "as (name:chararray);");
         Iterator<Tuple> it = pigServer.openIterator("a");
         Tuple t = it.next();
         Assert.assertEquals("wendyξ", t.get(0));
-        
+
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testMapUDF() throws Exception{
         int LOOP_COUNT = 2;
@@ -753,7 +738,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
         String query = "C = foreach B {"
@@ -793,7 +778,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
         String query = "C = foreach B {"
@@ -811,19 +796,19 @@ public class TestEvalPipeline {
 
     @Test
     public void testLoadCtorArgs() throws IOException, ExecException {
-        
+
         // Create input file
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                 new String[] {"hello:world"});
-        pigServer.registerQuery("a = load '" 
-                + Util.generateURI(input.toString(), pigContext) 
+        pigServer.registerQuery("a = load '"
+                + Util.generateURI(input.toString(), pigContext)
                 + "' using org.apache.pig.test.PigStorageNoDefCtor(':');");
         pigServer.registerQuery("b = foreach a generate (chararray)$0, (chararray)$1;");
         Iterator<Tuple> it = pigServer.openIterator("b");
         Tuple t = it.next();
         Assert.assertEquals("hello", t.get(0));
         Assert.assertEquals("world", t.get(1));
-        
+
     }
 
     @Test
@@ -839,7 +824,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -881,7 +866,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A;");
         String query = "C = foreach B {"
@@ -928,7 +913,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A;");
         String query = "C = foreach B {"
@@ -969,7 +954,7 @@ public class TestEvalPipeline {
         }
         ps.close();
 
-        pigServer.registerQuery("A = LOAD '" 
+        pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A ;"); //the argument does not matter
         pigServer.registerQuery("C = foreach B generate FLATTEN(" + Identity.class.getName() + "($0, $1));"); //the argument does not matter
@@ -990,7 +975,7 @@ public class TestEvalPipeline {
 
         Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
     }
-    
+
     @Test
     public void testCogroupAfterDistinct() throws Exception {
         String[] input1 = {
@@ -1011,13 +996,13 @@ public class TestEvalPipeline {
             };
         Util.createInputFile(cluster, "table1", input1);
         Util.createInputFile(cluster, "table2", input2);
-        
+
         pigServer.registerQuery("nonuniqtable1 = LOAD 'table1' AS (f1:chararray);");
         pigServer.registerQuery("table1 = DISTINCT nonuniqtable1;");
         pigServer.registerQuery("table2 = LOAD 'table2' AS (f1:chararray, f2:int);");
         pigServer.registerQuery("temp = COGROUP table1 BY f1 INNER, table2 BY f1;");
         Iterator<Tuple> it = pigServer.openIterator("temp");
-        
+
         // results should be:
         // (abc,{(abc)},{})
         // (def,{(def)},{})
@@ -1025,7 +1010,7 @@ public class TestEvalPipeline {
         HashMap<String, Tuple> results = new HashMap<String, Tuple>();
         Object[] row = new Object[] { "abc",
                 Util.createBagOfOneColumn(new String[] { "abc"}), mBf.newDefaultBag() };
-        results.put("abc", Util.createTuple(row)); 
+        results.put("abc", Util.createTuple(row));
         row = new Object[] { "def",
                 Util.createBagOfOneColumn(new String[] { "def"}), mBf.newDefaultBag() };
         results.put("def", Util.createTuple(row));
@@ -1044,16 +1029,16 @@ public class TestEvalPipeline {
                 Assert.assertEquals(expected.get(i++), field);
             }
         }
-        
+
         Util.deleteFile(cluster, "table1");
         Util.deleteFile(cluster, "table2");
     }
 
     @Test
     public void testAlgebraicDistinctProgress() throws Exception {
-    
+
         //creating a test input of larger than 1000 to make
-        //sure that progress kicks in. The only way to test this 
+        //sure that progress kicks in. The only way to test this
         //is to add a log statement to the getDistinct
         //method in Distinct.java. There is no automated mechanism
         //to check this from pig
@@ -1066,66 +1051,66 @@ public class TestEvalPipeline {
             inpString[i] = new Integer(i/2).toString();
             inpString[i+1] = new Integer(i/2).toString();
         }
-               
+
         Util.createInputFile(cluster, "table", inpString);
 
         pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
         pigServer.registerQuery("b = group a ALL;");
         pigServer.registerQuery("c = foreach b {aa = DISTINCT a; generate COUNT(aa);};");
         Iterator<Tuple> it = pigServer.openIterator("c");
-     
+
         Integer[] exp = new Integer[inputSize/2];
         for(int j = 0; j < inputSize/2; ++j) {
             exp[j] = j;
         }
 
         DataBag expectedBag = Util.createBagOfOneColumn(exp);
-        
+
         while(it.hasNext()) {
             Tuple tup = it.next();
             Long resultBagSize = (Long)tup.get(0);
             Assert.assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0);
         }
-        
-        Util.deleteFile(cluster, "table");        
+
+        Util.deleteFile(cluster, "table");
     }
 
     @Test
     public void testBinStorageWithLargeStrings() throws Exception {
         // Create input file with large strings
-    	int testSize = 100;
-    	String[] stringArray = new String[testSize];
-    	Random random = new Random();
-    	stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
-    	for(int i = 1; i < stringArray.length; ++i) {
-    		//generate a few large strings every 25th record
-    		if((i % 25) == 0) {
-    			stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);    			
-    		} else {
-    			stringArray[i] = GenRandomData.genRandString(random);
-    		}
-    	}
-        
-    	Util.createInputFile(cluster, "table", stringArray);
-        
-    	//test with BinStorage
+        int testSize = 100;
+        String[] stringArray = new String[testSize];
+        Random random = new Random();
+        stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
+        for(int i = 1; i < stringArray.length; ++i) {
+            //generate a few large strings every 25th record
+            if((i % 25) == 0) {
+                stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);
+            } else {
+                stringArray[i] = GenRandomData.genRandString(random);
+            }
+        }
+
+        Util.createInputFile(cluster, "table", stringArray);
+
+        //test with BinStorage
         pigServer.registerQuery("a = load 'table' using PigStorage() " +
                 "as (c: chararray);");
         String output = "/pig/out/TestEvalPipeline-testBinStorageLargeStrings";
         pigServer.deleteFile(output);
         pigServer.store("a", output, BinStorage.class.getName());
-        
+
         pigServer.registerQuery("b = load '" + output +"' using BinStorage() " +
-        "as (c:chararray);");
+                "as (c:chararray);");
         pigServer.registerQuery("c = foreach b generate c;");
-        
+
         Iterator<Tuple> it = pigServer.openIterator("c");
         int counter = 0;
         while(it.hasNext()) {
             Tuple tup = it.next();
             String resultString = (String)tup.get(0);
             String expectedString = stringArray[counter];
-          	Assert.assertTrue(expectedString.equals(resultString));
+            Assert.assertTrue(expectedString.equals(resultString));
             ++counter;
         }
         Util.deleteFile(cluster, "table");

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri May 30 19:07:23 2014
@@ -494,192 +494,6 @@ public class TestEvalPipeline2 {
         Util.deleteFile(cluster, "table_testNestedDescSort");
     }
     
-    // See PIG-282
-    @Test
-    public void testCustomPartitionerParseJoins() throws Exception{
-    	String[] input = {
-                "1\t3",
-                "1\t2"
-        };
-        Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
-        
-        // Custom Partitioner is not allowed for skewed joins, will throw a ExecException 
-        try {
-            pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
-            pigServer.registerQuery("B = ORDER A by $0;");
-        	pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
-        	//control should not reach here
-        	Assert.fail("Skewed join cannot accept a custom partitioner");
-        } catch(FrontendException e) {
-        	Assert.assertTrue( e.getMessage().contains( "Custom Partitioner is not supported for skewed join" ) );
-		}
-        
-        pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
-        Iterator<Tuple> iter = pigServer.openIterator("hash");
-        Tuple t;
-        
-        Collection<String> results = new HashSet<String>();
-        results.add("(1,3,1,2)");
-        results.add("(1,3,1,3)");
-        results.add("(1,2,1,2)");
-        results.add("(1,2,1,3)");
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        // No checks are made for merged and replicated joins as they are compiled to a map only job 
-        // No frontend error checking has been added for these jobs, hence not adding any test cases 
-        // Manually tested the sanity once. Above test should cover the basic sanity of the scenario 
-        
-        Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
-    }
-    
-    // See PIG-282
-    @Test
-    public void testCustomPartitionerGroups() throws Exception{
-        String[] input = {
-                "1\t1",
-                "2\t1",
-                "3\t1",
-                "4\t1"
-        };
-        Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
-        
-        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
-        
-        // It should be noted that for a map reduce job, the total number of partitions 
-        // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein 
-        // we will get more than one reduce job so that we can use the partitioner.
-        // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
-        // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
-        // partition number is bigger than 1.
-        //
-        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
-        
-        pigServer.store("B", "tmp_testCustomPartitionerGroups");
-        
-        new File("tmp_testCustomPartitionerGroups").mkdir();
-        
-        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
-        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
-        String line = null;
-        while((line = reader.readLine()) != null) {
-            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
-        }
-        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
-        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
-        line = null;
-        int count=0;
-        while((line = reader.readLine()) != null) {
-            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
-            count++;
-        }
-        Assert.assertEquals(4, count);
-        Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
-        Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
-        Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
-    }
-
-    // See PIG-3385
-    @Test
-    public void testCustomPartitionerDistinct() throws Exception{
-        String[] input = {
-                "1\t1",
-                "2\t1",
-                "1\t1",
-                "3\t1",
-                "4\t1",
-        };
-        Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
-
-        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
-        pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
-        pigServer.store("B", "tmp_testCustomPartitionerDistinct");
-
-        new File("tmp_testCustomPartitionerDistinct").mkdir();
-
-        // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
-        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
-        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
-        String line = null;
-        while((line = reader.readLine()) != null) {
-            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
-        }
-        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
-        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
-        line = null;
-        int count=0;
-        while((line = reader.readLine()) != null) {
-            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
-            count++;
-        }
-        Assert.assertEquals(4, count);
-        Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
-        Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
-        Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
-    }
-
-    // See PIG-282
-    @Test
-    public void testCustomPartitionerCross() throws Exception{
-    	String[] input = {
-                "1\t3",
-                "1\t2",
-        };
-    	
-        Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
-        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
-        pigServer.registerQuery("B = ORDER A by $0;");
-        pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
-        Iterator<Tuple> iter = pigServer.openIterator("C");
-        Tuple t;
-        
-        Collection<String> results = new HashSet<String>();
-        results.add("(1,3,1,2)");
-        results.add("(1,3,1,3)");
-        results.add("(1,2,1,2)");
-        results.add("(1,2,1,3)");
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Util.deleteFile(cluster, "table_testCustomPartitionerCross");
-    }
-    
     // See PIG-972
     @Test
     public void testDescribeNestedAlias() throws Exception{

Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin.java Fri May 30 19:07:23 2014
@@ -55,10 +55,10 @@ public class TestFRJoin {
     private static final String INPUT_FILE = "testFrJoinInput.txt";
     private static final String INPUT_FILE2 = "testFrJoinInput2.txt";
     private PigServer pigServer;
-    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     public TestFRJoin() throws ExecException, IOException {
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
     }
 
     @Before
@@ -126,11 +126,10 @@ public class TestFRJoin {
             pc.connect();
 
             ld.setPc(pc);
-            Tuple dummyTuple = null;
             for (Result res = ld.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = ld
                     .getNextTuple()) {
                 Tuple tup = (Tuple)res.result;
-                LoadFunc lf = ((LoadFunc)pc.instantiateFuncFromSpec(ld.getLFile().getFuncSpec()));
+                LoadFunc lf = ((LoadFunc)PigContext.instantiateFuncFromSpec(ld.getLFile().getFuncSpec()));
                 String key = lf.getLoadCaster().bytesToCharArray(
                         ((DataByteArray)tup.get(keyField)).get());
                 Tuple csttup = TupleFactory.getInstance().newTuple(2);
@@ -530,6 +529,7 @@ public class TestFRJoin {
         Schema frjSch = null, shjSch = null;
         pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
         frjSch = pigServer.dumpSchema("C");
+        assertNull(frjSch);
         pigServer.registerQuery("C = join A by $0, B by $0;");
         shjSch = pigServer.dumpSchema("C");
         assertNull(shjSch);
@@ -556,6 +556,7 @@ public class TestFRJoin {
         Schema frjSch = null, shjSch = null;
         pigServer.registerQuery("D = join A by $0, B by $0, C by $0 using 'repl';");
         frjSch = pigServer.dumpSchema("D");
+        assertNull(frjSch);
         pigServer.registerQuery("D = join A by $0, B by $0, C by $0;");
         shjSch = pigServer.dumpSchema("D");
         assertNull(shjSch);
@@ -580,6 +581,7 @@ public class TestFRJoin {
         Schema frjSch = null, shjSch = null;
         pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1) using 'repl';");
         frjSch = pigServer.dumpSchema("C");
+        assertNull(frjSch);
         pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1);");
         shjSch = pigServer.dumpSchema("C");
         assertNull(shjSch);

Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Fri May 30 19:07:23 2014
@@ -43,26 +43,30 @@ import org.junit.Test;
 
 public class TestFRJoin2 {
 
-    private static MiniCluster cluster = MiniCluster.buildCluster();
-    
+    // This class contains tests for
+    //    - Concatenating small files before adding to DistributedCache (PIG-1458)
+    //    - imposing size limit on files being added to DistributedCache
+    // Since Replicated join in Tez does not use DistributedCache, these tests are MR specific
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_MR);
+
     private static final String INPUT_DIR = "frjoin";
     private static final String INPUT_FILE = "input";
-    
+
     private static final int FILE_MERGE_THRESHOLD = 5;
     private static final int MIN_FILE_MERGE_THRESHOLD = 1;
-    
+
     //contents of input dir joined by comma
     private static String concatINPUT_DIR = null;
     private File logFile;
 
-    
+
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
         StringBuilder strBuilder = new StringBuilder();
         FileSystem fs = cluster.getFileSystem();
         fs.mkdirs(new Path(INPUT_DIR));
         int LOOP_SIZE = 2;
-        for (int i=0; i<FILE_MERGE_THRESHOLD; i++) {        
+        for (int i=0; i<FILE_MERGE_THRESHOLD; i++) {
             String[] input = new String[2*LOOP_SIZE];
             for (int n=0; n<LOOP_SIZE; n++) {
                 for (int j=0; j<LOOP_SIZE;j++) {
@@ -76,7 +80,7 @@ public class TestFRJoin2 {
         }
         strBuilder.deleteCharAt(strBuilder.length() - 1);
         concatINPUT_DIR = strBuilder.toString();
-        
+
         String[] input2 = new String[2*(LOOP_SIZE/2)];
         int k = 0;
         for (int i=1; i<=LOOP_SIZE/2; i++) {
@@ -93,179 +97,175 @@ public class TestFRJoin2 {
         cluster.shutDown();
     }
 
-    // test simple scalar alias with file concatenation following 
+    // test simple scalar alias with file concatenation following
     // a MapReduce job
     @Test
     public void testConcatenateJobForScalar() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
-        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
-        
-        // using $0*0, instead of group-all because group-all sets parallelism to 1 
-        pigServer.registerQuery("B = group A by $0*0 parallel 5;"); 
+
+        // using $0*0, instead of group-all because group-all sets parallelism to 1
+        pigServer.registerQuery("B = group A by $0*0 parallel 5;");
         pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.y) as max;");
-        
+
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
                     MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD));
-            
+
             pigServer.registerQuery("D= foreach A generate x / C.count, C.max - y;");
             Iterator<Tuple> iter = pigServer.openIterator("D");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
-            
+
             JobGraph jGraph = PigStats.get().getJobGraph();
             assertEquals(3, jGraph.size());
-            // find added map-only concatenate job 
+            // find added map-only concatenate job
             MRJobStats js = (MRJobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
-            assertEquals(1, js.getNumberMaps());   
-            assertEquals(0, js.getNumberReduces()); 
+            assertEquals(1, js.getNumberMaps());
+            assertEquals(0, js.getNumberReduces());
         }
         {
             pigServer.getPigContext().getProperties().setProperty(
                     "pig.noSplitCombination", "true");
-            
+
             pigServer.registerQuery("D= foreach A generate x / C.count, C.max - y;");
             Iterator<Tuple> iter = pigServer.openIterator("D");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
-            
+
             assertEquals(2, PigStats.get().getJobGraph().size());
         }
-        
+
         assertEquals(dbfrj.size(), dbshj.size());
-        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-    
-    // test simple scalar alias with file concatenation following 
+
+    // test simple scalar alias with file concatenation following
     // a Map-only job
     @Test
     public void testConcatenateJobForScalar2() throws Exception {
         logFile = Util.resetLog(MRCompiler.class, logFile);
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
-        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_DIR +  "/{part-00*}" +"' as (x:int,y:int);");
         pigServer.registerQuery("C = filter B by (x == 3) AND (y == 2);");
-        
+
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
                     MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
-            
+
             pigServer.registerQuery("D = foreach A generate x / C.x, y + C.y;");
             Iterator<Tuple> iter = pigServer.openIterator("D");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
-            
+
             JobGraph jGraph = PigStats.get().getJobGraph();
             assertEquals(3, jGraph.size());
-            // find added map-only concatenate job 
+            // find added map-only concatenate job
             MRJobStats js = (MRJobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
-            assertEquals(1, js.getNumberMaps());   
-            assertEquals(0, js.getNumberReduces()); 
-            Util.checkLogFileMessage(logFile, 
-                    new String[] {"number of input files: 0", "failed to get number of input files"}, 
+            assertEquals(1, js.getNumberMaps());
+            assertEquals(0, js.getNumberReduces());
+            Util.checkLogFileMessage(logFile,
+                    new String[] {"number of input files: 0", "failed to get number of input files"},
                     false
             );
         }
         {
             pigServer.getPigContext().getProperties().setProperty(
                     "pig.noSplitCombination", "true");
-            
+
             pigServer.registerQuery("D = foreach A generate x / C.x, y + C.y;");
             Iterator<Tuple> iter = pigServer.openIterator("D");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
-            
+
             assertEquals(2, PigStats.get().getJobGraph().size());
         }
-        
+
         assertEquals(dbfrj.size(), dbshj.size());
-        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-    
-    // test scalar alias with file concatenation following 
+
+    // test scalar alias with file concatenation following
     // a multi-query job
     @Test
     public void testConcatenateJobForScalar3() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
-        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         pigServer.registerQuery("C = group A all parallel 5;");
         pigServer.registerQuery("D = foreach C generate COUNT(A) as count;");
         pigServer.registerQuery("E = foreach C generate MAX(A.x) as max;");
-        
+
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
                     MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
-            
+
             pigServer.registerQuery("F = foreach B generate x / D.count, y + E.max;");
             Iterator<Tuple> iter = pigServer.openIterator("F");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
-            
+
             JobGraph jGraph = PigStats.get().getJobGraph();
             assertEquals(4, jGraph.size());
         }
         {
             pigServer.getPigContext().getProperties().setProperty(
                     "pig.noSplitCombination", "true");
-            
+
             pigServer.registerQuery("F = foreach B generate x / D.count, y + E.max;");
             Iterator<Tuple> iter = pigServer.openIterator("F");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
-            
+
             assertEquals(2, PigStats.get().getJobGraph().size());
         }
-        
+
         assertEquals(dbfrj.size(), dbshj.size());
-        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-    
+
     @Test
     public void testConcatenateJobForFRJoin() throws Exception {
         logFile = Util.resetLog(MRCompiler.class, logFile);
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
-        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_DIR +  "/{part-00*}" + "' as (x:int,y:int);");
-        
+
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
-                    MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));            
-            
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
+
             pigServer.registerQuery("C = join A by y, B by y using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
-            
+
             assertEquals(3, PigStats.get().getJobGraph().size());
-            Util.checkLogFileMessage(logFile, 
-                    new String[] {"number of input files: 0", "failed to get number of input files"}, 
+            Util.checkLogFileMessage(logFile,
+                    new String[] {"number of input files: 0", "failed to get number of input files"},
                     false
             );
 
@@ -273,28 +273,27 @@ public class TestFRJoin2 {
         {
             pigServer.getPigContext().getProperties().setProperty(
                     "pig.noSplitCombination", "true");
-            
+
             pigServer.registerQuery("C = join A by y, B by y using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
-            
+
             assertEquals(2, PigStats.get().getJobGraph().size());
         }
-        
+
         assertEquals(dbfrj.size(), dbshj.size());
-        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));  
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-            
+
     @Test
     public void testTooManyReducers() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
-        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
-        pigServer.registerQuery("B = group A by x parallel " + FILE_MERGE_THRESHOLD + ";"); 
+        pigServer.registerQuery("B = group A by x parallel " + FILE_MERGE_THRESHOLD + ";");
         pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
@@ -302,52 +301,52 @@ public class TestFRJoin2 {
                     MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD));
             pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("D");
-            
+
             while(iter.hasNext()) {
                 Tuple t = iter.next();
-                dbfrj.add(t);               
+                dbfrj.add(t);
             }
-            
+
             JobGraph jGraph = PigStats.get().getJobGraph();
             assertEquals(3, jGraph.size());
-            // find added map-only concatenate job 
+            // find added map-only concatenate job
             MRJobStats js = (MRJobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
-            assertEquals(1, js.getNumberMaps());   
-            assertEquals(0, js.getNumberReduces());   
+            assertEquals(1, js.getNumberMaps());
+            assertEquals(0, js.getNumberReduces());
         }
         {
             pigServer.getPigContext().getProperties().setProperty(
                     "pig.noSplitCombination", "true");
             pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("D");
-            
+
             while(iter.hasNext()) {
                 Tuple t = iter.next();
-                dbshj.add(t);                
+                dbshj.add(t);
             }
             assertEquals(2, PigStats.get().getJobGraph().size());
-        }        
+        }
         assertEquals(dbfrj.size(), dbshj.size());
-        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-    
+
     @Test
     public void testUnknownNumMaps() throws Exception {
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-        
+
         pigServer.registerQuery("A = LOAD '" + concatINPUT_DIR + "' as (x:int,y:int);");
         pigServer.registerQuery("B = Filter A by x < 50;");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
-                    MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));  
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
             pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
- 
+
             JobGraph jGraph = PigStats.get().getJobGraph();
             assertEquals(3, jGraph.size());
         }
@@ -356,30 +355,30 @@ public class TestFRJoin2 {
                     "pig.noSplitCombination", "true");
             pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
             assertEquals(2, PigStats.get().getJobGraph().size());
         }
         assertEquals(dbfrj.size(), dbshj.size());
-        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-    
+
     @Test
     public void testUnknownNumMaps2() throws Exception {
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-        
+
         pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as (x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         pigServer.registerQuery("C = join A by x, B by x using 'repl';");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.getPigContext().getProperties().setProperty(
-                    MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));  
+                    MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
             pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("D");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
@@ -392,7 +391,7 @@ public class TestFRJoin2 {
                     "pig.noSplitCombination", "true");
             pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("D");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
@@ -404,8 +403,7 @@ public class TestFRJoin2 {
 
     @Test
     public void testTooBigReplicatedFile() throws Exception {
-        PigServer pigServer = new PigServer(
-                ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
 
         pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as (x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");

Modified: pig/trunk/test/org/apache/pig/test/TestFRJoinNullValue.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoinNullValue.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoinNullValue.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoinNullValue.java Fri May 30 19:07:23 2014
@@ -20,22 +20,20 @@ package org.apache.pig.test;
 
 import java.util.Iterator;
 
-import junit.framework.Assert;
-
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestFRJoinNullValue {
 
-    private static MiniCluster cluster = MiniCluster.buildCluster();
-    
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
         String[] input = new String[4];
@@ -52,14 +50,14 @@ public class TestFRJoinNullValue {
 
     @Test
     public void testNullMatch() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("A = LOAD 'input';");
         pigServer.registerQuery("B = LOAD 'input';");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.registerQuery("C = join A by $0, B by $0 using 'replicated';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
@@ -67,25 +65,25 @@ public class TestFRJoinNullValue {
         {
             pigServer.registerQuery("C = join A by $0, B by $0;");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
         }
         Assert.assertEquals(dbfrj.size(), dbshj.size());
-        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));        
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-    
+
     @Test
     public void testTupleNullMatch() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("A = LOAD 'input' as (x:int,y:int,z:int);");
         pigServer.registerQuery("B = LOAD 'input' as (x:int,y:int,z:int);");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.registerQuery("C = join A by (x, y), B by (x, y) using 'replicated';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
@@ -93,25 +91,25 @@ public class TestFRJoinNullValue {
         {
             pigServer.registerQuery("C = join A by (x, y), B by (x, y);");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
         }
         Assert.assertEquals(dbfrj.size(), dbshj.size());
-        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));        
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-    
+
     @Test
     public void testLeftNullMatch() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("A = LOAD 'input' as (x:int,y:int, z:int);");
         pigServer.registerQuery("B = LOAD 'input' as (x:int,y:int, z:int);");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.registerQuery("C = join A by $0 left, B by $0 using 'replicated';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
@@ -119,25 +117,25 @@ public class TestFRJoinNullValue {
         {
             pigServer.registerQuery("C = join A by $0 left, B by $0;");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
         }
         Assert.assertEquals(dbfrj.size(), dbshj.size());
-        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));        
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
-    
+
     @Test
     public void testTupleLeftNullMatch() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("A = LOAD 'input' as (x:int,y:int,z:int);");
         pigServer.registerQuery("B = LOAD 'input' as (x:int,y:int,z:int);");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
         {
             pigServer.registerQuery("C = join A by (x, y) left, B by (x, y) using 'replicated';");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbfrj.add(iter.next());
             }
@@ -145,12 +143,12 @@ public class TestFRJoinNullValue {
         {
             pigServer.registerQuery("C = join A by (x, y) left, B by (x, y);");
             Iterator<Tuple> iter = pigServer.openIterator("C");
-            
+
             while(iter.hasNext()) {
                 dbshj.add(iter.next());
             }
         }
         Assert.assertEquals(dbfrj.size(), dbshj.size());
-        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));        
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestFilterUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFilterUDF.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFilterUDF.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFilterUDF.java Fri May 30 19:07:23 2014
@@ -17,7 +17,6 @@ package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -27,7 +26,6 @@ import java.io.PrintWriter;
 import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
@@ -40,7 +38,7 @@ import org.junit.Test;
 
 public class TestFilterUDF {
     private PigServer pigServer;
-    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
     private File tmpFile;
 
     TupleFactory tf = TupleFactory.getInstance();
@@ -57,7 +55,7 @@ public class TestFilterUDF {
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         int LOOP_SIZE = 20;
         tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));

Modified: pig/trunk/test/org/apache/pig/test/TestFinish.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFinish.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFinish.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFinish.java Fri May 30 19:07:23 2014
@@ -34,7 +34,6 @@ import org.apache.pig.builtin.PigStorage
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,7 +45,7 @@ public class TestFinish {
     BagFactory mBf = BagFactory.getInstance();
     File f1;
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     static public class MyEvalFunction extends EvalFunc<Tuple> {
         String execType;
@@ -78,10 +77,8 @@ public class TestFinish {
 
     @Before
     public void setUp() throws Exception {
-        // re initialize FileLocalizer so that each test runs correctly without
-        // any side effect of other tests - this is needed here since some
-        // tests are in mapred and some in local mode
-        FileLocalizer.setInitialized(false);
+        // Reset state since some tests are in mapred and some in local mode
+        Util.resetStateForExecModeSwitch();
     }
 
     @AfterClass
@@ -102,7 +99,7 @@ public class TestFinish {
             }
             ps.close();
         } else {
-            pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+            pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
             f1 = File.createTempFile("test", "txt");
             f1.delete();
             inputFileName = Util.removeColon(f1.getAbsolutePath());
@@ -118,7 +115,7 @@ public class TestFinish {
 
     private void checkAndCleanup(ExecType execType, String expectedFileName,
             String inputFileName) throws IOException {
-        if (execType == ExecType.MAPREDUCE) {
+        if (execType == cluster.getExecType()) {
             FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
                     cluster.getProperties()));
             assertTrue(fs.exists(new Path(expectedFileName)));
@@ -136,7 +133,7 @@ public class TestFinish {
 
     @Test
     public void testFinishInMapMR() throws Exception {
-        String inputFileName = setUp(ExecType.MAPREDUCE);
+        String inputFileName = setUp(cluster.getExecType());
         // this file will be created on the cluster if finish() is called
         String expectedFileName = "testFinishInMapMR-finish.txt";
         pigServer.registerQuery("define MYUDF " + MyEvalFunction.class.getName() + "('MAPREDUCE','"
@@ -149,13 +146,13 @@ public class TestFinish {
             iter.next();
         }
 
-        checkAndCleanup(ExecType.MAPREDUCE, expectedFileName, inputFileName);
+        checkAndCleanup(cluster.getExecType(), expectedFileName, inputFileName);
 
     }
 
     @Test
     public void testFinishInReduceMR() throws Exception {
-        String inputFileName = setUp(ExecType.MAPREDUCE);
+        String inputFileName = setUp(cluster.getExecType());
         // this file will be created on the cluster if finish() is called
         String expectedFileName = "testFinishInReduceMR-finish.txt";
         pigServer.registerQuery("define MYUDF " + MyEvalFunction.class.getName() + "('MAPREDUCE','"
@@ -169,7 +166,7 @@ public class TestFinish {
             iter.next();
         }
 
-        checkAndCleanup(ExecType.MAPREDUCE, expectedFileName, inputFileName);
+        checkAndCleanup(cluster.getExecType(), expectedFileName, inputFileName);
     }
 
     @Test