You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC

svn commit: r1784224 [11/17] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCounters.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCounters.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCounters.java Fri Feb 24 03:34:37 2017
@@ -30,17 +30,17 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -49,8 +49,8 @@ import org.junit.runners.JUnit4;
 public class TestCounters {
     String file = "input.txt";
 
-    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
-
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    
     final int MAX = 100*1000;
     Random r = new Random();
 
@@ -59,7 +59,7 @@ public class TestCounters {
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-
+    
     @Test
     public void testMapOnly() throws IOException, ExecException {
         int count = 0;
@@ -70,13 +70,13 @@ public class TestCounters {
             if(t > 50) count ++;
         }
         pw.close();
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
         ExecJob job = pigServer.store("c", "output_map_only");
         PigStats pigStats = job.getStatistics();
-
+        
         //counting the no. of bytes in the output file
         //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
@@ -85,9 +85,9 @@ public class TestCounters {
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-
+        
         is.close();
-
+        
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output_map_only"), true);
 
@@ -98,7 +98,7 @@ public class TestCounters {
         JobGraph jg = pigStats.getJobGraph();
         Iterator<JobStats> iter = jg.iterator();
         while (iter.hasNext()) {
-            JobStats js = iter.next();
+            MRJobStats js = (MRJobStats) iter.next();                    
 
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
@@ -123,20 +123,20 @@ public class TestCounters {
                 count ++;
         }
         pw.close();
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
         ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
         PigStats pigStats = job.getStatistics();
-
+        
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
                 "output_map_only", pigServer.getPigContext()),
                 pigServer.getPigContext());
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-
+        
         is.close();
 
         cluster.getFileSystem().delete(new Path(file), true);
@@ -149,8 +149,8 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            JobStats js = iter.next();
-
+            MRJobStats js = (MRJobStats) iter.next();
+        
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -158,7 +158,7 @@ public class TestCounters {
             assertEquals(0, js.getReduceInputRecords());
             assertEquals(0, js.getReduceOutputRecords());
         }
-
+            
         System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
@@ -183,7 +183,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
@@ -195,7 +195,7 @@ public class TestCounters {
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-
+        
         is.close();
 
         cluster.getFileSystem().delete(new Path(file), true);
@@ -208,7 +208,7 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            JobStats js = iter.next();
+            MRJobStats js = (MRJobStats) iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -242,7 +242,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
@@ -253,9 +253,9 @@ public class TestCounters {
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-
+        
         is.close();
-
+        
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
@@ -266,7 +266,7 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            JobStats js = iter.next();
+            MRJobStats js = (MRJobStats) iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -300,7 +300,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
@@ -311,20 +311,20 @@ public class TestCounters {
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-
+        
         is.close();
-
+ 
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
-
+        
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            JobStats js = iter.next();
+            MRJobStats js = (MRJobStats) iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -337,7 +337,7 @@ public class TestCounters {
         System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
-
+     
     @Test
     public void testMapCombineReduceBinStorage() throws IOException, ExecException {
         int count = 0;
@@ -358,20 +358,20 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
 
         ExecJob job = pigServer.store("c", "output", "BinStorage");
         PigStats pigStats = job.getStatistics();
-
+        
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
                 pigServer.getPigContext()), pigServer.getPigContext());
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-
+        
         is.close();
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
@@ -379,11 +379,11 @@ public class TestCounters {
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
-
+ 
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            JobStats js = iter.next();
+            MRJobStats js = (MRJobStats) iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -399,8 +399,6 @@ public class TestCounters {
 
     @Test
     public void testMultipleMRJobs() throws IOException, ExecException {
-        Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job",
-                Util.isMapredExecType(cluster.getExecType()));
         int count = 0;
         PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
         int [] nos = new int[10];
@@ -415,38 +413,38 @@ public class TestCounters {
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) {
+        for(int i = 0; i < 10; i++) { 
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = order a by $0;");
         pigServer.registerQuery("c = group b by $0;");
         pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
         ExecJob job = pigServer.store("d", "output");
         PigStats pigStats = job.getStatistics();
-
+        
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-
+        
         is.close();
-
+        
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
-
+        
         System.out.println("============================================");
         System.out.println("Test case MultipleMRJobs");
         System.out.println("============================================");
-
+        
         JobGraph jp = pigStats.getJobGraph();
-        JobStats js = (JobStats)jp.getSinks().get(0);
-
+        MRJobStats js = (MRJobStats)jp.getSinks().get(0);
+        
         System.out.println("Job id: " + js.getName());
         System.out.println(jp.toString());
-
+        
         System.out.println("Map input records : " + js.getMapInputRecords());
         assertEquals(MAX, js.getMapInputRecords());
         System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -455,12 +453,12 @@ public class TestCounters {
         assertEquals(count, js.getReduceInputRecords());
         System.out.println("Reduce output records : " + js.getReduceOutputRecords());
         assertEquals(count, js.getReduceOutputRecords());
-
+        
         System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
         assertEquals(filesize, js.getHdfsBytesWritten());
 
     }
-
+    
     @Test
     public void testMapOnlyMultiQueryStores() throws Exception {
         PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
@@ -469,8 +467,8 @@ public class TestCounters {
             pw.println(t);
         }
         pw.close();
-
-        PigServer pigServer = new PigServer(cluster.getExecType(),
+        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file + "';");
@@ -481,22 +479,22 @@ public class TestCounters {
         List<ExecJob> jobs = pigServer.executeBatch();
         PigStats stats = jobs.get(0).getStatistics();
         assertTrue(stats.getOutputLocations().size() == 2);
-
+        
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
-
+        MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
+        
         Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
         for (Long val : entry.values()) {
             counter += val;
         }
-
-        assertEquals(MAX, counter);
-    }
-
+        
+        assertEquals(MAX, counter);       
+    }    
+    
     @Test
     public void testMultiQueryStores() throws Exception {
         int[] nums = new int[100];
@@ -507,13 +505,13 @@ public class TestCounters {
             nums[t]++;
         }
         pw.close();
-
+        
         int groups = 0;
         for (int i : nums) {
             if (i > 0) groups++;
         }
-
-        PigServer pigServer = new PigServer(cluster.getExecType(),
+        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file + "';");
@@ -527,29 +525,29 @@ public class TestCounters {
         pigServer.registerQuery("store g into '/tmp/outout2';");
         List<ExecJob> jobs = pigServer.executeBatch();
         PigStats stats = jobs.get(0).getStatistics();
-
+        
         assertTrue(stats.getOutputLocations().size() == 2);
-
+               
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
-
+        MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
+        
         Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
         for (Long val : entry.values()) {
             counter += val;
         }
-
-        assertEquals(groups, counter);
-    }
-
-    /*
+        
+        assertEquals(groups, counter);       
+    }    
+    
+    /*    
      * IMPORTANT NOTE:
      * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
      * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
-     */
+     */ 
 //    @Test
 //    public void testLocal() throws IOException, ExecException {
 //        int count = 0;
@@ -568,7 +566,7 @@ public class TestCounters {
 //        }
 //        pw.close();
 //
-//        for(int i = 0; i < 10; i++)
+//        for(int i = 0; i < 10; i++) 
 //            if(nos[i] > 0)
 //                count ++;
 //
@@ -582,56 +580,56 @@ public class TestCounters {
 //        pigServer.registerQuery("c = group b by $0;");
 //        pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
 //        PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
-//        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs());
+//        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
 //        long filesize = 0;
 //        while(is.read() != -1) filesize++;
-//
+//        
 //        is.close();
 //        out.delete();
-//
+//        
 //        //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-//
+//        
 //        assertEquals(10, pigStats.getRecordsWritten());
 //        assertEquals(110, pigStats.getBytesWritten());
 //
 //    }
 
     @Test
-    public void testJoinInputCounters() throws Exception {
+    public void testJoinInputCounters() throws Exception {        
         testInputCounters("join");
     }
-
+    
     @Test
-    public void testCogroupInputCounters() throws Exception {
+    public void testCogroupInputCounters() throws Exception {        
         testInputCounters("cogroup");
     }
-
+    
     @Test
-    public void testSkewedInputCounters() throws Exception {
+    public void testSkewedInputCounters() throws Exception {        
         testInputCounters("skewed");
     }
-
+    
     @Test
-    public void testSelfJoinInputCounters() throws Exception {
+    public void testSelfJoinInputCounters() throws Exception {        
         testInputCounters("self-join");
     }
-
+    
     private static boolean multiInputCreated = false;
-
+    
     private static int count = 0;
-
-    private void testInputCounters(String keyword) throws Exception {
+            
+    private void testInputCounters(String keyword) throws Exception {  
         String file1 = "multi-input1.txt";
         String file2 = "multi-input2.txt";
-
+        
         String output = keyword;
-
+        
         if (keyword.equals("self-join")) {
             file2 = file1;
             keyword = "join";
         }
-
-        final int MAX_NUM_RECORDS = 100;
+         
+        final int MAX_NUM_RECORDS = 100; 
         if (!multiInputCreated) {
             PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
             for (int i = 0; i < MAX_NUM_RECORDS; i++) {
@@ -639,7 +637,7 @@ public class TestCounters {
                 pw.println(t);
             }
             pw.close();
-
+                        
             PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
             for (int i = 0; i < MAX_NUM_RECORDS; i++) {
                 int t = r.nextInt(100);
@@ -651,8 +649,8 @@ public class TestCounters {
             pw2.close();
             multiInputCreated = true;
         }
-
-        PigServer pigServer = new PigServer(cluster.getExecType(),
+        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file1 + "';");
@@ -663,7 +661,7 @@ public class TestCounters {
             pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
         }
         ExecJob job = pigServer.store("c", output + "_output");
-
+        
         PigStats stats = job.getStatistics();
         assertTrue(stats.isSuccessful());
         List<InputStats> inputs = stats.getInputStats();
@@ -682,46 +680,4 @@ public class TestCounters {
             }
         }
     }
-
-    @Test
-    public void testSplitUnionOutputCounters() throws Exception {
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
-        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input"));
-        for (int i = 0; i < 10; i++) {
-            pw.println(i);
-        }
-        pw.close();
-        String query =
-                "a = load 'splitunion-input';" +
-                "split a into b if $0 < 5, c otherwise;" +
-                "d = union b, c;";
-
-        pigServer.registerQuery(query);
-
-        ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage");
-        PigStats stats1 = job.getStatistics();
-
-        query =
-                "a = load 'splitunion-input';" +
-                "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" +
-                "e = distinct d;" +
-                "f = union b, c, e;";
-
-        pigServer.registerQuery(query);
-
-        job = pigServer.store("f", "splitunion-output-1", "PigStorage");
-        PigStats stats2 = job.getStatistics();
-
-        PigStats[] pigStats = new PigStats[]{stats1, stats2};
-        for (int i = 0; i < 2; i++) {
-            PigStats stats = pigStats[i];
-            assertTrue(stats.isSuccessful());
-            List<OutputStats> outputs = stats.getOutputStats();
-            assertEquals(1, outputs.size());
-            OutputStats output = outputs.get(0);
-            assertEquals("splitunion-output-" + i, output.getName());
-            assertEquals(10, output.getNumberRecords());
-            assertEquals(20, output.getBytes());
-        }
-    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDataBag.java Fri Feb 24 03:34:37 2017
@@ -17,36 +17,17 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
+import java.util.*;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Random;
-import java.util.TreeSet;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DefaultDataBag;
-import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.DistinctDataBag;
-import org.apache.pig.data.InternalCachedBag;
-import org.apache.pig.data.InternalDistinctBag;
-import org.apache.pig.data.InternalSortedBag;
-import org.apache.pig.data.NonSpillableDataBag;
-import org.apache.pig.data.SingleTupleBag;
-import org.apache.pig.data.SortedDataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+
+
+import org.apache.pig.data.*;
 import org.apache.pig.impl.util.Spillable;
 import org.junit.After;
 import org.junit.Test;
@@ -55,7 +36,7 @@ import org.junit.Test;
 /**
  * This class will exercise the basic Pig data model and members. It tests for proper behavior in
  * assignment and comparison, as well as function application.
- *
+ * 
  * @author dnm
  */
 public class TestDataBag  {
@@ -609,7 +590,7 @@ public class TestDataBag  {
             }
             mgr.forceSpill();
         }
-
+        
        assertEquals("Size of distinct data bag is incorrect", rightAnswer.size(), b.size());
 
         // Read tuples back, hopefully they come out in the same order.
@@ -738,14 +719,14 @@ public class TestDataBag  {
     @Test
     public void testDefaultBagFactory() throws Exception {
         BagFactory f = BagFactory.getInstance();
-
+       
         DataBag bag = f.newDefaultBag();
         DataBag sorted = f.newSortedBag(null);
         DataBag distinct = f.newDistinctBag();
 
         assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
         assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
-        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
+        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));         
     }
 
     @Test
@@ -775,7 +756,7 @@ public class TestDataBag  {
         try {
             BagFactory f = BagFactory.getInstance();
         } catch (RuntimeException re) {
-            assertEquals("Expected does not extend BagFactory message",
+            assertEquals("Expected does not extend BagFactory message", 
                 "Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!",
                 re.getMessage());
             caughtIt = true;
@@ -794,7 +775,7 @@ public class TestDataBag  {
 
         BagFactory.resetSelf();
     }
-
+    
     @Test
     public void testNonSpillableDataBagEquals1() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -808,7 +789,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-
+    
     @Test
     public void testNonSpillableDataBagEquals2() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -823,7 +804,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-
+    
     @Test
     public void testDefaultDataBagEquals1() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -839,7 +820,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-
+    
     @Test
     public void testDefaultDataBagEquals2() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -856,35 +837,35 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-
-    public void testInternalCachedBag() throws Exception {
+    
+    public void testInternalCachedBag() throws Exception {    
     	// check adding empty tuple
     	DataBag bg0 = new InternalCachedBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 2);
-
+    	
     	// check equal of bags
     	DataBag bg1 = new InternalCachedBag(1, 0.5f);
     	assertEquals(bg1.size(), 0);
-
+    	
     	String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-
+    	
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertFalse(bg1.isSorted());
     	assertFalse(bg1.isDistinct());
-
+    	
     	tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
     	DataBag bg2 = new InternalCachedBag(1, 0.5f);
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-
+        
         // check bag with data written to disk
         DataBag bg3 = new InternalCachedBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -892,7 +873,7 @@ public class TestDataBag  {
             bg3.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg3);
-
+        
         // check iterator
         Iterator<Tuple> iter = bg3.iterator();
         DataBag bg4 = new InternalCachedBag(1, 0.0f);
@@ -900,7 +881,7 @@ public class TestDataBag  {
         	bg4.add(iter.next());
         }
         assertEquals(bg3, bg4);
-
+        
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
@@ -913,46 +894,46 @@ public class TestDataBag  {
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
         assertEquals(bg3, bg5);
-
-
+        
+        
         bg4.clear();
-        assertEquals(bg4.size(), 0);
+        assertEquals(bg4.size(), 0);        
     }
-
-    public void testInternalSortedBag() throws Exception {
-
+    
+    public void testInternalSortedBag() throws Exception {    
+    	
     	// check adding empty tuple
     	DataBag bg0 = new InternalSortedBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 2);
-
+    	
     	// check equal of bags
     	DataBag bg1 = new InternalSortedBag();
     	assertEquals(bg1.size(), 0);
-
+    	
     	String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }};
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-
+    	
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertTrue(bg1.isSorted());
     	assertFalse(bg1.isDistinct());
-
+    	
     	tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
     	DataBag bg2 = new InternalSortedBag();
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-
+        
         Iterator<Tuple> iter = bg1.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+        
         // check bag with data written to disk
         DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -960,17 +941,17 @@ public class TestDataBag  {
             bg3.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg3);
-
+        
         iter = bg3.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
-        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));                
+        
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
-
+        
         DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
         bg4.add(iter.next());
         bg4.add(iter.next());
@@ -978,21 +959,21 @@ public class TestDataBag  {
         bg4.add(iter.next());
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
-        assertEquals(bg3, bg4);
-
+        assertEquals(bg3, bg4);        
+        
         // check clear
         bg3.clear();
         assertEquals(bg3.size(), 0);
-
+        
         // test with all data spill out
-        DataBag bg5 = new InternalSortedBag();
+        DataBag bg5 = new InternalSortedBag();        
         for(int j=0; j<3; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg5.add(Util.createTuple(tupleContents[i]));
-        	}
+        	}     
         	bg5.spill();
         }
-
+        
         assertEquals(bg5.size(), 9);
         iter = bg5.iterator();
         for(int i=0; i<3; i++) {
@@ -1002,21 +983,21 @@ public class TestDataBag  {
         	iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         }
         for(int i=0; i<3; i++) {
-        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));   
         }
-
+        
         // test with most data spill out, with some data in memory
         // and merge of spill files
-        DataBag bg6 = new InternalSortedBag();
+        DataBag bg6 = new InternalSortedBag();        
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg6.add(Util.createTuple(tupleContents[i]));
-        	}
+        	}        	
         	if (j != 103) {
         		bg6.spill();
         	}
         }
-
+        
         assertEquals(bg6.size(), 104*3);
         iter = bg6.iterator();
         for(int i=0; i<104; i++) {
@@ -1026,55 +1007,55 @@ public class TestDataBag  {
         	iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         }
         for(int i=0; i<104; i++) {
-        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));   
         }
-
+        
         // check two implementation of sorted bag can compare correctly
-        DataBag bg7 = new SortedDataBag(null);
+        DataBag bg7 = new SortedDataBag(null);        
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg7.add(Util.createTuple(tupleContents[i]));
-        	}
+        	}        	
         	if (j != 103) {
         		bg7.spill();
         	}
         }
         assertEquals(bg6, bg7);
     }
-
-    public void testInternalDistinctBag() throws Exception {
+    
+    public void testInternalDistinctBag() throws Exception {    
     	// check adding empty tuple
     	DataBag bg0 = new InternalDistinctBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 1);
-
+    	
     	// check equal of bags
     	DataBag bg1 = new InternalDistinctBag();
     	assertEquals(bg1.size(), 0);
-
+    	
     	String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-
+    	
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertFalse(bg1.isSorted());
     	assertTrue(bg1.isDistinct());
-
+    	
     	tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} };
     	DataBag bg2 = new InternalDistinctBag();
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-
+        
         Iterator<Tuple> iter = bg1.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+        
         // check bag with data written to disk
         DataBag bg3 = new InternalDistinctBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
@@ -1083,13 +1064,13 @@ public class TestDataBag  {
         }
         assertEquals(bg2, bg3);
         assertEquals(bg3.size(), 3);
-
-
+              
+        
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
-
+        
         DataBag bg4 = new InternalDistinctBag(1, 0.0f);
         bg4.add(iter.next());
         bg4.add(iter.next());
@@ -1097,73 +1078,73 @@ public class TestDataBag  {
         bg4.add(iter.next());
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
-        assertEquals(bg3, bg4);
-
+        assertEquals(bg3, bg4);        
+        
         // check clear
         bg3.clear();
         assertEquals(bg3.size(), 0);
-
+        
         // test with all data spill out
-        DataBag bg5 = new InternalDistinctBag();
+        DataBag bg5 = new InternalDistinctBag();        
         for(int j=0; j<3; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg5.add(Util.createTuple(tupleContents[i]));
-        	}
+        	}        
         	bg5.spill();
         }
-
+        
         assertEquals(bg5.size(), 3);
-
-
+    
+        
         // test with most data spill out, with some data in memory
         // and merge of spill files
-        DataBag bg6 = new InternalDistinctBag();
+        DataBag bg6 = new InternalDistinctBag();        
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg6.add(Util.createTuple(tupleContents[i]));
-        	}
+        	}        	
         	if (j != 103) {
         		bg6.spill();
         	}
         }
-
-        assertEquals(bg6.size(), 3);
-
+        
+        assertEquals(bg6.size(), 3);       
+        
         // check two implementation of sorted bag can compare correctly
-        DataBag bg7 = new DistinctDataBag();
+        DataBag bg7 = new DistinctDataBag();        
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg7.add(Util.createTuple(tupleContents[i]));
-        	}
+        	}        	
         	if (j != 103) {
         		bg7.spill();
         	}
         }
         assertEquals(bg6, bg7);
     }
-
+    
     // See PIG-1231
     @Test
     public void testDataBagIterIdempotent() throws Exception {
         DataBag bg0 = new DefaultDataBag();
         processDataBag(bg0, true);
-
+        
         DataBag bg1 = new DistinctDataBag();
         processDataBag(bg1, true);
-
+        
         DataBag bg2 = new InternalDistinctBag();
         processDataBag(bg2, true);
-
+        
         DataBag bg3 = new InternalSortedBag();
         processDataBag(bg3, true);
-
+        
         DataBag bg4 = new SortedDataBag(null);
         processDataBag(bg4, true);
-
+        
         DataBag bg5 = new InternalCachedBag(0, 0);
         processDataBag(bg5, false);
     }
-
+    
     // See PIG-1285
     @Test
     public void testSerializeSingleTupleBag() throws Exception {
@@ -1178,7 +1159,7 @@ public class TestDataBag  {
         dfBag.readFields(dis);
         assertTrue(dfBag.equals(stBag));
     }
-
+    
     // See PIG-2550
     static class MyCustomTuple extends DefaultTuple {
         private static final long serialVersionUID = 8156382697467819543L;
@@ -1203,23 +1184,7 @@ public class TestDataBag  {
         Tuple t2 = iter.next();
         assertTrue(t2.equals(t));
     }
-
-    // See PIG-4260
-    @Test
-    public void testSpillArrayBackedList() throws Exception {
-        Tuple[] tuples = new Tuple[2];
-        tuples[0] = TupleFactory.getInstance().newTuple(1);
-        tuples[0].set(0, "first");
-        tuples[1] = TupleFactory.getInstance().newTuple(1);
-        tuples[1].set(0, "second");
-        DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples));
-        bag.spill();
-        Iterator<Tuple> iter = bag.iterator();
-        assertEquals(tuples[0], iter.next());
-        assertEquals(tuples[1], iter.next());
-        assertFalse(iter.hasNext());
-    }
-
+    
     void processDataBag(DataBag bg, boolean doSpill) {
         Tuple t = TupleFactory.getInstance().newTuple(new Integer(0));
         bg.add(t);
@@ -1229,7 +1194,7 @@ public class TestDataBag  {
         assertTrue(iter.hasNext());
         iter.next();
         assertFalse(iter.hasNext());
-        assertFalse("hasNext should be idempotent", iter.hasNext());
+        assertFalse("hasNext should be idempotent", iter.hasNext());        
     }
 }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestDivide.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDivide.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDivide.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDivide.java Fri Feb 24 03:34:37 2017
@@ -20,9 +20,6 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-import java.math.BigDecimal;
-import java.math.MathContext;
-import java.math.RoundingMode;
 import java.util.Map;
 import java.util.Random;
 
@@ -56,7 +53,7 @@ public class TestDivide {
     public void testOperator() throws ExecException {
         // int TRIALS = 10;
         byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY,
-                        DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.BIGDECIMAL,
+                        DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG,
                         DataType.DATETIME, DataType.MAP, DataType.TUPLE };
         // Map<Byte,String> map = GenRandomData.genTypeToNameMap();
         System.out.println("Testing DIVIDE operator");
@@ -253,33 +250,6 @@ public class TestDivide {
                 assertEquals(null, (Long)resl.result);
                 break;
             }
-            case DataType.BIGDECIMAL: {
-                MathContext mc = new MathContext(Divide.BIGDECIMAL_MINIMAL_SCALE, RoundingMode.HALF_UP);
-                BigDecimal inpf1 = new BigDecimal(r.nextDouble(),mc);
-                BigDecimal inpf2 = new BigDecimal(r.nextDouble(),mc);
-                lt.setValue(inpf1);
-                rt.setValue(inpf2);
-                Result resf = op.getNextBigDecimal();
-                BigDecimal expected = inpf1.divide(inpf2, 2 * Divide.BIGDECIMAL_MINIMAL_SCALE + 1, RoundingMode.HALF_UP);
-                assertEquals(expected, (BigDecimal)resf.result);
-
-                // test with null in lhs
-                lt.setValue(null);
-                rt.setValue(inpf2);
-                resf = op.getNextBigDecimal();
-                assertEquals(null, (BigDecimal)resf.result);
-                // test with null in rhs
-                lt.setValue(inpf1);
-                rt.setValue(null);
-                resf = op.getNextBigDecimal();
-                assertEquals(null, (BigDecimal)resf.result);
-                // test divide by 0
-                lt.setValue(inpf1);
-                rt.setValue(new BigDecimal(0.0f,mc));
-                resf = op.getNextBigDecimal();
-                assertEquals(null, (BigDecimal)resf.result);
-                break;
-            }
             case DataType.DATETIME:
                 DateTime inpdt1 = new DateTime(r.nextLong());
                 DateTime inpdt2 = new DateTime(r.nextLong());

Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Fri Feb 24 03:34:37 2017
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
 import java.io.PrintWriter;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.PigRunner;
+import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
@@ -38,15 +38,16 @@ import org.junit.Test;
 
 public class TestEmptyInputDir {
 
-    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+    private static MiniCluster cluster; 
     private static final String EMPTY_DIR = "emptydir";
     private static final String INPUT_FILE = "input";
     private static final String OUTPUT_FILE = "output";
     private static final String PIG_FILE = "test.pig";
 
-
+    
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
+        cluster = MiniCluster.buildCluster();
         FileSystem fs = cluster.getFileSystem();
         if (!fs.mkdirs(new Path(EMPTY_DIR))) {
             throw new Exception("failed to create empty dir");
@@ -63,35 +64,7 @@ public class TestEmptyInputDir {
     public static void tearDownAfterClass() throws Exception {
         cluster.shutDown();
     }
-
-    @Test
-    public void testGroupBy() throws Exception {
-        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
-        w.println("A = load '" + EMPTY_DIR + "';");
-        w.println("B = group A by $0;");
-        w.println("store B into '" + OUTPUT_FILE + "';");
-        w.close();
-
-        try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
-            PigStats stats = PigRunner.run(args, null);
-
-            assertTrue(stats.isSuccessful());
-
-            // This assert fails on 205 due to MAPREDUCE-3606
-            if (Util.isMapredExecType(cluster.getExecType())
-                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
-                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
-                assertEquals(0, js.getNumberMaps());
-            }
-
-            assertEmptyOutputFile();
-        } finally {
-            new File(PIG_FILE).delete();
-            Util.deleteFile(cluster, OUTPUT_FILE);
-        }
-    }
-
+    
     @Test
     public void testSkewedJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -100,28 +73,31 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0, A by $0 using 'skewed';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-
+        
         try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            String[] args = { PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
-
+     
             assertTrue(stats.isSuccessful());
-
+            // the sampler job has zero maps
+            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
+            
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (Util.isMapredExecType(cluster.getExecType())
-                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
-                // the sampler job has zero maps
-                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
-                assertEquals(0, js.getNumberMaps());
-            }
-
-            assertEmptyOutputFile();
+            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
+                assertEquals(0, js.getNumberMaps()); 
+            
+            FileSystem fs = cluster.getFileSystem();
+            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+            assertTrue(status.isDir());
+            assertEquals(0, status.getLen());
+            // output directory isn't empty
+            assertTrue(fs.listStatus(status.getPath()).length > 0);
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-
+    
     @Test
     public void testMergeJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -130,28 +106,32 @@ public class TestEmptyInputDir {
         w.println("C = join A by $0, B by $0 using 'merge';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-
+        
         try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            String[] args = { PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
-
-            assertTrue(stats.isSuccessful());
-
+     
+            assertTrue(stats.isSuccessful());    
+            // the indexer job has zero maps
+            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
+            
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (Util.isMapredExecType(cluster.getExecType())
-                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
-                // the indexer job has zero maps
-                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
-                assertEquals(0, js.getNumberMaps());
-            }
-
-            assertEmptyOutputFile();
+            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
+                assertEquals(0, js.getNumberMaps()); 
+            
+            FileSystem fs = cluster.getFileSystem();
+            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+            assertTrue(status.isDir());
+            assertEquals(0, status.getLen());
+            
+            // output directory isn't empty
+            assertTrue(fs.listStatus(status.getPath()).length > 0);            
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-
+    
     @Test
     public void testFRJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -160,44 +140,55 @@ public class TestEmptyInputDir {
         w.println("C = join A by $0, B by $0 using 'repl';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-
+        
         try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            String[] args = { PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
-
-            assertTrue(stats.isSuccessful());
-
+     
+            assertTrue(stats.isSuccessful());    
+            // the indexer job has zero maps
+            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
+            
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (Util.isMapredExecType(cluster.getExecType())
-                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
-                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
-                assertEquals(0, js.getNumberMaps());
-            }
-
-            assertEmptyOutputFile();
+            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
+                assertEquals(0, js.getNumberMaps()); 
+            
+            FileSystem fs = cluster.getFileSystem();
+            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+            assertTrue(status.isDir());
+            assertEquals(0, status.getLen());
+            
+            // output directory isn't empty
+            assertTrue(fs.listStatus(status.getPath()).length > 0);            
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-
+    
     @Test
     public void testRegularJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("A = load '" + INPUT_FILE + "';");
         w.println("B = load '" + EMPTY_DIR + "';");
-        w.println("C = join B by $0, A by $0 PARALLEL 0;");
+        w.println("C = join B by $0, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-
+        
         try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            String[] args = { PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
-
-            assertTrue(stats.isSuccessful());
-
-            assertEmptyOutputFile();
-
+     
+            assertTrue(stats.isSuccessful());   
+            
+            FileSystem fs = cluster.getFileSystem();
+            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+            assertTrue(status.isDir());
+            assertEquals(0, status.getLen());
+            
+            // output directory isn't empty
+            assertTrue(fs.listStatus(status.getPath()).length > 0);            
+            
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -212,19 +203,19 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0 right outer, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-
+        
         try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            String[] args = { PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
-
-            assertTrue(stats.isSuccessful());
-            assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
+     
+            assertTrue(stats.isSuccessful());               
+            assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));                  
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-
+    
     @Test
     public void testLeftOuterJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -233,88 +224,16 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0 left outer, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-
-        try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
-            PigStats stats = PigRunner.run(args, null);
-
-            assertTrue(stats.isSuccessful());
-            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
-        } finally {
-            new File(PIG_FILE).delete();
-            Util.deleteFile(cluster, OUTPUT_FILE);
-        }
-    }
-
-    @Test
-    public void testBloomJoin() throws Exception {
-        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
-        w.println("A = load '" + INPUT_FILE + "' as (x:int);");
-        w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
-        w.println("C = join B by $0, A by $0 using 'bloom';");
-        w.println("D = join A by $0, B by $0 using 'bloom';");
-        w.println("store C into '" + OUTPUT_FILE + "';");
-        w.println("store D into 'output1';");
-        w.close();
-
-        try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
-            PigStats stats = PigRunner.run(args, null);
-
-            assertTrue(stats.isSuccessful());
-            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
-            assertEquals(0, stats.getNumberRecords("output1"));
-            assertEmptyOutputFile();
-        } finally {
-            new File(PIG_FILE).delete();
-            Util.deleteFile(cluster, OUTPUT_FILE);
-            Util.deleteFile(cluster, "output1");
-        }
-    }
-
-    @Test
-    public void testBloomJoinOuter() throws Exception {
-        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
-        w.println("A = load '" + INPUT_FILE + "' as (x:int);");
-        w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
-        w.println("C = join B by $0 left outer, A by $0 using 'bloom';");
-        w.println("D = join A by $0 left outer, B by $0 using 'bloom';");
-        w.println("E = join B by $0 right outer, A by $0 using 'bloom';");
-        w.println("F = join A by $0 right outer, B by $0 using 'bloom';");
-        w.println("store C into '" + OUTPUT_FILE + "';");
-        w.println("store D into 'output1';");
-        w.println("store E into 'output2';");
-        w.println("store F into 'output3';");
-        w.close();
-
+        
         try {
-            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            String[] args = { PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
-
-            assertTrue(stats.isSuccessful());
-            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
-            assertEquals(2, stats.getNumberRecords("output1"));
-            assertEquals(2, stats.getNumberRecords("output2"));
-            assertEquals(0, stats.getNumberRecords("output3"));
-            assertEmptyOutputFile();
+     
+            assertTrue(stats.isSuccessful());               
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));                  
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
-            Util.deleteFile(cluster, "output1");
-            Util.deleteFile(cluster, "output2");
-            Util.deleteFile(cluster, "output3");
         }
     }
-
-    private void assertEmptyOutputFile() throws IllegalArgumentException, IOException {
-        FileSystem fs = cluster.getFileSystem();
-        FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-        assertTrue(status.isDir());
-        assertEquals(0, status.getLen());
-        // output directory isn't empty. Has one empty file
-        FileStatus[] files = fs.listStatus(status.getPath(), Util.getSuccessMarkerPathFilter());
-        assertEquals(1, files.length);
-        assertEquals(0, files[0].getLen());
-        assertTrue(files[0].getPath().getName().startsWith("part-"));
-    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java Fri Feb 24 03:34:37 2017
@@ -200,11 +200,11 @@ public class TestErrorHandlingStoreFunc
     private void updatePigProperties(boolean allowErrors, long minErrors,
             double errorThreshold) {
         Properties properties = pigServer.getPigContext().getProperties();
-        properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED,
+        properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS,
                 Boolean.toString(allowErrors));
-        properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
+        properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
                 Long.toString(minErrors));
-        properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT,
+        properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT,
                 Double.toString(errorThreshold));
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Feb 24 03:34:37 2017
@@ -291,7 +291,7 @@ public class TestEvalPipeline {
             myMap.put("long", new Long(1));
             myMap.put("float", new Float(1.0));
             myMap.put("double", new Double(1.0));
-            myMap.put("dba", new DataByteArray(new String("1234").getBytes()));
+            myMap.put("dba", new DataByteArray(new String("bytes").getBytes()));
             myMap.put("map", mapInMap);
             myMap.put("tuple", tuple);
             myMap.put("bag", bag);
@@ -794,31 +794,32 @@ public class TestEvalPipeline {
     }
 
     @Test
-    public void testMapUDFWithImplicitTypeCast() throws Exception{
+    public void testMapUDFfail() throws Exception{
         int LOOP_COUNT = 2;
         File tmpFile = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
-            ps.println(i);
+            for(int j=0;j<LOOP_COUNT;j+=2){
+                ps.println(i+"\t"+j);
+                ps.println(i+"\t"+j);
+            }
         }
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
-        String query = "C = foreach B generate mymap#'dba' * 10; ";
+        String query = "C = foreach B {"
+        + "generate mymap#'dba' * 10;"
+        + "};";
 
         pigServer.registerQuery(query);
-
-        Iterator<Tuple> iter = pigServer.openIterator("C");
-        if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
-        while(iter.hasNext()){
-            Tuple t = iter.next();
-            Assert.assertEquals(new Integer(12340), (Integer)t.get(0));
-            ++numIdentity;
+        try {
+            pigServer.openIterator("C");
+            Assert.fail("Error expected.");
+        } catch (Exception e) {
+            e.getMessage().contains("Cannot determine");
         }
-        Assert.assertEquals(LOOP_COUNT, numIdentity);
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Fri Feb 24 03:34:37 2017
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
@@ -37,10 +38,10 @@ import org.apache.pig.impl.builtin.FindQ
 import org.junit.Test;
 
 public class TestFindQuantiles {
-
+    
     private static TupleFactory tFact = TupleFactory.getInstance();
     private static final float epsilon = 0.0001f;
-
+    
     @Test
     public void testFindQuantiles() throws Exception {
        final int numSamples = 97778;
@@ -49,7 +50,7 @@ public class TestFindQuantiles {
        System.out.println("sum: " + sum);
        assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
     }
-
+    
     @Test
     public void testFindQuantiles2() throws Exception {
        final int numSamples = 30000;
@@ -85,7 +86,7 @@ public class TestFindQuantiles {
     }
 
     private float[] getProbVec(Tuple values) throws Exception {
-        float[] probVec = new float[values.size()];
+        float[] probVec = new float[values.size()];        
         for(int i = 0; i < values.size(); i++) {
             probVec[i] = (Float)values.get(i);
         }
@@ -94,7 +95,7 @@ public class TestFindQuantiles {
 
     private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
         Random rand = new Random(1000);
-        List<Tuple> samples = new ArrayList<Tuple>();
+        List<Tuple> samples = new ArrayList<Tuple>(); 
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, rand.nextInt(max));
@@ -105,7 +106,7 @@ public class TestFindQuantiles {
     }
 
     private DataBag generateUniqueSamples(int numSamples) throws Exception {
-        DataBag samples = BagFactory.getInstance().newDefaultBag();
+        DataBag samples = BagFactory.getInstance().newDefaultBag(); 
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, new Integer(23));
@@ -120,9 +121,9 @@ public class TestFindQuantiles {
 
         in.set(0, new Integer(numReduceres));
         in.set(1, samples);
-
+        
         FindQuantiles fq = new FindQuantiles();
-
+        
         Map<String, Object> res = fq.exec(in);
         return res;
     }
@@ -134,11 +135,12 @@ public class TestFindQuantiles {
         InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
         Iterator<Object> it = weightedPartsData.values().iterator();
         float[] probVec = getProbVec((Tuple)it.next());
+        new DiscreteProbabilitySampleGenerator(probVec);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         return sum;
     }
-
+    
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Fri Feb 24 03:34:37 2017
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.pig.PigServer;
-import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.Test;
@@ -106,31 +105,6 @@ public class TestForEachNestedPlanLocal
     }
 
     @Test
-    public void testNestedCrossTwoRelationsLimit() throws Exception {
-        Storage.Data data = Storage.resetData(pig);
-        data.set("input",
-                Storage.tuple(Storage.bag(Storage.tuple(1, 1), Storage.tuple(1, 2)), Storage.bag(Storage.tuple(1, 3), Storage.tuple(1, 4))),
-                Storage.tuple(Storage.bag(Storage.tuple(2, 1), Storage.tuple(2, 2)), Storage.bag(Storage.tuple(2, 3))),
-                Storage.tuple(Storage.bag(Storage.tuple(3, 1)), Storage.bag(Storage.tuple(3, 2))));
-
-        pig.setBatchOn();
-        pig.registerQuery("A = load 'input' using mock.Storage() as (bag1:bag{tup1:tuple(f1:int, f2:int)}, bag2:bag{tup2:tuple(f3:int, f4:int)});");
-        pig.registerQuery("B = foreach A {"
-                + "crossed = cross bag1, bag2;"
-                + "filtered = filter crossed by f1 == f3;"
-                + "lmt = limit filtered 1;"
-                + "generate FLATTEN(lmt);" + "}");
-        pig.registerQuery("store B into 'output' using mock.Storage();");
-
-        pig.executeBatch();
-
-        List<Tuple> actualResults = data.get("output");
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {"(1, 1, 1, 3)", "(2, 1, 2, 3)", "(3, 1, 3, 2)"});
-        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
-    }
-
-    @Test
     public void testNestedCrossTwoRelationsComplex() throws Exception {
         File[] tmpFiles = generateDataSetFilesForNestedCross();
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new String[] {

Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Fri Feb 24 03:34:37 2017
@@ -20,7 +20,6 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -51,7 +50,6 @@ public class TestGFCross {
     public void testSerial() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
-        cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 
@@ -68,7 +66,6 @@ public class TestGFCross {
     public void testParallelSet() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
-        cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Feb 24 03:34:37 2017
@@ -28,7 +28,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
-import java.io.FilenameFilter;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
@@ -971,6 +970,7 @@ public class TestGrunt {
 
     @Test
     public void testStopOnFailure() throws Throwable {
+        Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType()));
         PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
         PigContext context = server.getPigContext();
         context.getProperties().setProperty("stop.on.failure", ""+true);
@@ -1569,20 +1569,4 @@ public class TestGrunt {
         }
         assertTrue(found);
     }
-
-    @Test
-    public void testGruntUtf8() throws Throwable {
-        String command = "mkdir 测试\n" +
-                "quit\n";
-        System.setProperty("jline.WindowsTerminal.directConsole", "false");
-        System.setIn(new ByteArrayInputStream(command.getBytes()));
-        org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null);
-        File[] partFiles = new File(".").listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) { 
-            return name.equals("测试");
-        }
-        });
-        assertEquals(partFiles.length, 1);
-        new File("测试").delete();
-    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Fri Feb 24 03:34:37 2017
@@ -71,16 +71,12 @@ public class TestHBaseStorage {
     private static final String TESTTABLE_1 = "pigtable_1";
     private static final String TESTTABLE_2 = "pigtable_2";
     private static final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
-    private static final byte[] COLUMNFAMILY2 = Bytes.toBytes("pig2");
     private static final String TESTCOLUMN_A = "pig:col_a";
     private static final String TESTCOLUMN_B = "pig:col_b";
     private static final String TESTCOLUMN_C = "pig:col_c";
 
     private static final int TEST_ROW_COUNT = 100;
 
-    private enum TableType {ONE_CF, TWO_CF};
-    private TableType lastTableType;
-
     @BeforeClass
     public static void setUp() throws Exception {
         // This is needed by Pig
@@ -317,13 +313,13 @@ public class TestHBaseStorage {
      */
     @Test
     public void testLoadWithMap_3_col_prefix() throws IOException {
-        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText, TableType.TWO_CF);
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
 
         pig.registerQuery("a = load 'hbase://"
                 + TESTTABLE_1
                 + "' using "
                 + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
-                + "pig2:* pig:prefixed_col_*"
+                + "pig:col_* pig:prefixed_col_*"
                 + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
         Iterator<Tuple> it = pig.openIterator("a");
         int count = 0;
@@ -332,18 +328,24 @@ public class TestHBaseStorage {
             Tuple t = it.next();
             LOG.info("LoadFromHBase " + t);
             String rowKey = t.get(0).toString();
-            Map pig_secondery_cf_map = (Map) t.get(1);
+            Map pig_cf_map = (Map) t.get(1);
             Map pig_prefix_cf_map = (Map) t.get(2);
             Assert.assertEquals(3, t.size());
 
             Assert.assertEquals("00".substring((count + "").length()) + count,
                     rowKey);
-            Assert.assertEquals(count,
-                    Integer.parseInt(pig_secondery_cf_map.get("col_x").toString()));
             Assert.assertEquals("PrefixedText_" + count,
                     ((DataByteArray) pig_prefix_cf_map.get("prefixed_col_d")).toString());
             Assert.assertEquals(1, pig_prefix_cf_map.size());
 
+            Assert.assertEquals(count,
+                    Integer.parseInt(pig_cf_map.get("col_a").toString()));
+            Assert.assertEquals(count + 0.0,
+                    Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6);
+            Assert.assertEquals("Text_" + count,
+                    ((DataByteArray) pig_cf_map.get("col_c")).toString());
+            Assert.assertEquals(3, pig_cf_map.size());
+
             count++;
         }
         Assert.assertEquals(TEST_ROW_COUNT, count);
@@ -432,39 +434,6 @@ public class TestHBaseStorage {
         LOG.info("LoadFromHBase done");
     }
 
-    public void testLoadWithFixedAndPrefixedCols3() throws IOException {
-        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
-
-        pig.registerQuery("a = load 'hbase://"
-                + TESTTABLE_1
-                + "' using "
-                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
-                + "pig:* pig:prefixed_col_*"
-                + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
-        Iterator<Tuple> it = pig.openIterator("a");
-        int count = 0;
-        LOG.info("LoadFromHBase Starting");
-        while (it.hasNext()) {
-            Tuple t = it.next();
-            LOG.info("LoadFromHBase " + t);
-            String rowKey = (String) t.get(0);
-            Map pig_cf_map = (Map) t.get(1);
-            Map pig_prefix_cf_map = (Map) t.get(2);
-            Assert.assertEquals(3, t.size());
-
-            Assert.assertEquals("00".substring((count + "").length()) + count,
-                    rowKey);
-            Assert.assertEquals("PrefixedText_" + count,
-                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
-            Assert.assertEquals(1, pig_cf_map.size());
-            Assert.assertEquals(1, pig_prefix_cf_map.size());
-
-            count++;
-        }
-        Assert.assertEquals(TEST_ROW_COUNT, count);
-        LOG.info("LoadFromHBase done");
-    }
-
     /**
      *     * Test Load from hbase with map parameters and with a
      *     static column in different order
@@ -1517,36 +1486,22 @@ public class TestHBaseStorage {
                 + "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
     }
 
-    private HTable prepareTable(String tableName, boolean initData,
-            DataFormat format) throws IOException {
-        return prepareTable(tableName, initData, format, TableType.ONE_CF);
-    }
     /**
      * Prepare a table in hbase for testing.
      *
      */
     private HTable prepareTable(String tableName, boolean initData,
-            DataFormat format, TableType type) throws IOException {
+            DataFormat format) throws IOException {
         // define the table schema
         HTable table = null;
         try {
-            if (lastTableType == type) {
-                deleteAllRows(tableName);
-            } else {
-                util.deleteTable(tableName);
-            }
+            deleteAllRows(tableName);
         } catch (Exception e) {
             // It's ok, table might not exist.
         }
         try {
-            if (type == TableType.TWO_CF) {
-                table = util.createTable(Bytes.toBytesBinary(tableName),
-                        new byte[][]{COLUMNFAMILY, COLUMNFAMILY2});
-            } else {
-                table = util.createTable(Bytes.toBytesBinary(tableName),
-                        COLUMNFAMILY);
-            }
-            lastTableType = type;
+        table = util.createTable(Bytes.toBytesBinary(tableName),
+                COLUMNFAMILY);
         } catch (Exception e) {
             table = new HTable(conf, Bytes.toBytesBinary(tableName));
         }
@@ -1573,11 +1528,6 @@ public class TestHBaseStorage {
                     // prefixed_col_d: string type
                     put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
                             Bytes.toBytes("PrefixedText_" + i));
-                    // another cf
-                    if (type == TableType.TWO_CF) {
-                        put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
-                                Bytes.toBytes(i));
-                    }
                     table.put(put);
                 } else {
                     // row key: string type
@@ -1598,11 +1548,6 @@ public class TestHBaseStorage {
                     // prefixed_col_d: string type
                     put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
                             ("PrefixedText_" + i).getBytes());
-                    // another cf
-                    if (type == TableType.TWO_CF) {
-                        put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
-                                (i + "").getBytes());
-                    }
                     table.put(put);
                 }
             }

Modified: pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java Fri Feb 24 03:34:37 2017
@@ -63,6 +63,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -130,7 +131,7 @@ public class TestJobControlCompiler {
     // verifying the jar gets on distributed cache
     Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
     // guava jar is not shipped with Hadoop 2.x
-    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length);
+    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
     Path distributedCachePath = fileClassPaths[0];
     Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
     // hadoop bug requires path to not contain hdfs://hotname in front
@@ -234,12 +235,22 @@ public class TestJobControlCompiler {
           // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
           System.out.println("cache.files= " + Arrays.toString(cacheURIs));
           System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
-          // Default jars - 5 (pig, antlr, joda-time, automaton)
-          // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
-          Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
-                  Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
-          Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
-                  Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+          if (HadoopShims.isHadoopYARN()) {
+              // Default jars - 5 (pig, antlr, joda-time, automaton)
+              // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
+              Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+                      Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+              Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
+                      Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+          } else {
+              // Default jars - 5. Has guava in addition
+              // There will be same entries duplicated for udf.jar and udf2.jar
+              Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12,
+                      Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+              Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12,
+                      Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+          }
+
           // Count occurrences of the resources
           Map<String, Integer> occurrences = new HashMap<String, Integer>();
 
@@ -248,12 +259,22 @@ public class TestJobControlCompiler {
               val = (val == null) ? 1 : ++val;
               occurrences.put(cacheURI.toString(), val);
           }
-          Assert.assertEquals(9, occurrences.size());
+          if (HadoopShims.isHadoopYARN()) {
+              Assert.assertEquals(9, occurrences.size());
+          } else {
+              Assert.assertEquals(10, occurrences.size()); //guava jar in addition
+          }
 
           for (String file : occurrences.keySet()) {
-              // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
-              // and second time through pig register jar when there is symlink
-              Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
+              if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) {
+                  // Same path added twice which is ok. It should not be a shipped to hdfs temp path.
+                  // We assert path is same by checking count
+                  Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file));
+              } else {
+                  // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
+                  // and second time through pig register jar when there is symlink
+                  Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
+              }
           }
       }