You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/09/14 20:28:40 UTC

svn commit: r1760750 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/ src/org/apache/pig/tools/pigstats/tez/ test/org/apache/pig/test/

Author: rohini
Date: Wed Sep 14 20:28:40 2016
New Revision: 1760750

URL: http://svn.apache.org/viewvc?rev=1760750&view=rev
Log:
PIG-5032: Output record stats in Tez is wrong when there is split followed by union (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
    pig/trunk/test/org/apache/pig/test/TestCounters.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1760750&r1=1760749&r2=1760750&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep 14 20:28:40 2016
@@ -46,6 +46,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5032: Output record stats in Tez is wrong when there is split followed by union (rohini)
+
 PIG-5031: Tez failing to compile when replicate join is done with a limit vertex on left (knoguchi)
 
 PIG-5019: Pig generates tons of warnings for udf with enabled warnings aggregation (murshyd via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1760750&r1=1760749&r2=1760750&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Wed Sep 14 20:28:40 2016
@@ -102,19 +102,19 @@ public class POStoreTez extends POStore
             throw new ExecException(e);
         }
 
-        // Multiple outputs - can be another store or other outputs (shuffle, broadcast)
-        if (outputs.size() > 1) {
-            CounterGroup multiStoreGroup = processorContext.getCounters()
-                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            if (multiStoreGroup == null) {
-                processorContext.getCounters().addGroup(
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            }
-            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
-            if (name != null) {
-                outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
-            }
+        // Even if there is a single hdfs output, we add multi store counter
+        // Makes it easier for user to see records for a particular store from
+        // the DAG counter
+        CounterGroup multiStoreGroup = processorContext.getCounters()
+                .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        if (multiStoreGroup == null) {
+            processorContext.getCounters().addGroup(
+                    MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+                    MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        }
+        String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+        if (name != null) {
+            outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1760750&r1=1760749&r2=1760750&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Wed Sep 14 20:28:40 2016
@@ -245,7 +245,11 @@ public class TezDAGStats extends JobStat
                             OutputStats existingOut = outputsByLocation.get(output.getLocation());
                             // In case of multistore, bytesWritten is already calculated
                             // from size of all the files in the output directory.
-                            if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) {
+                            // So use that if there is a combination of multistore and single store
+                            if (output.getPOStore().isMultiStore()) {
+                                existingOut.setBytes(output.getBytes());
+                                existingOut.setPOStore(output.getPOStore());
+                            } else if (!existingOut.getPOStore().isMultiStore() && output.getBytes() > -1) {
                                 long bytes = existingOut.getBytes() > -1
                                         ? (existingOut.getBytes() + output.getBytes())
                                         : output.getBytes();

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1760750&r1=1760749&r2=1760750&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Sep 14 20:28:40 2016
@@ -22,6 +22,7 @@ import static org.apache.pig.tools.pigst
 import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -289,13 +290,19 @@ public class TezVertexStats extends JobS
         }
 
         // Split followed by union will have multiple stores writing to same location
-        Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
+        Map<String, List<POStore>> uniqueOutputs = new HashMap<String, List<POStore>>();
         for (POStore sto : stores) {
             POStoreTez store = (POStoreTez) sto;
-            uniqueOutputs.put(store.getOutputKey(), store);
+            List<POStore> stores = uniqueOutputs.get(store.getOutputKey());
+            if (stores == null) {
+                stores = new ArrayList<POStore>();
+            }
+            stores.add(store);
+            uniqueOutputs.put(store.getOutputKey(), stores);
         }
 
-        for (POStore sto : uniqueOutputs.values()) {
+        for (List<POStore> stores : uniqueOutputs.values()) {
+            POStore sto = stores.get(0);
             if (sto.isTmpStore()) {
                 continue;
             }
@@ -304,11 +311,16 @@ public class TezVertexStats extends JobS
             String filename = sto.getSFile().getFileName();
             if (counters != null) {
                 if (msGroup != null) {
-                    Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
-                    if (n != null) records = n;
-                }
-                if (records == -1) {
-                    records = outputRecords;
+                    long n = 0;
+                    Long val = null;
+                    for (POStore store : stores) {
+                        val = msGroup.get(PigStatsUtil.getMultiStoreCounterName(store));
+                        // Tez removes 0 value counters for efficiency.
+                        if (val != null) {
+                            n += val;
+                        };
+                    }
+                    records = n;
                 }
                 if (isSuccessful() && records == -1) {
                     // Tez removes 0 value counters for efficiency.
@@ -338,13 +350,13 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public int getNumberMaps() {
-        return this.isMapOpts ? numTasks : -1;
+        return this.isMapOpts ? numTasks : 0;
     }
 
     @Override
     @Deprecated
     public int getNumberReduces() {
-        return this.isMapOpts ? -1 : numTasks;
+        return this.isMapOpts ? 0 : numTasks;
     }
 
     @Override
@@ -386,25 +398,25 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public long getMapInputRecords() {
-        return this.isMapOpts ? numInputRecords : -1;
+        return this.isMapOpts ? numInputRecords : 0;
     }
 
     @Override
     @Deprecated
     public long getMapOutputRecords() {
-        return this.isMapOpts ? numOutputRecords : -1;
+        return this.isMapOpts ? numOutputRecords : 0;
     }
 
     @Override
     @Deprecated
     public long getReduceInputRecords() {
-        return this.isMapOpts ? -1 : numInputRecords;
+        return numReduceInputRecords;
     }
 
     @Override
     @Deprecated
     public long getReduceOutputRecords() {
-        return this.isMapOpts ? -1 : numOutputRecords;
+        return this.isMapOpts ? 0 : numOutputRecords;
     }
 
     @Override

Modified: pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=1760750&r1=1760749&r2=1760750&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCounters.java Wed Sep 14 20:28:40 2016
@@ -30,17 +30,17 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -49,8 +49,8 @@ import org.junit.runners.JUnit4;
 public class TestCounters {
     String file = "input.txt";
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
     final int MAX = 100*1000;
     Random r = new Random();
 
@@ -59,7 +59,7 @@ public class TestCounters {
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
     @Test
     public void testMapOnly() throws IOException, ExecException {
         int count = 0;
@@ -70,13 +70,13 @@ public class TestCounters {
             if(t > 50) count ++;
         }
         pw.close();
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
         ExecJob job = pigServer.store("c", "output_map_only");
         PigStats pigStats = job.getStatistics();
-        
+
         //counting the no. of bytes in the output file
         //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
@@ -85,9 +85,9 @@ public class TestCounters {
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output_map_only"), true);
 
@@ -98,7 +98,7 @@ public class TestCounters {
         JobGraph jg = pigStats.getJobGraph();
         Iterator<JobStats> iter = jg.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();                    
+            JobStats js = iter.next();
 
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
@@ -123,20 +123,20 @@ public class TestCounters {
                 count ++;
         }
         pw.close();
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
         ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
                 "output_map_only", pigServer.getPigContext()),
                 pigServer.getPigContext());
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
 
         cluster.getFileSystem().delete(new Path(file), true);
@@ -149,8 +149,8 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
-        
+            JobStats js = iter.next();
+
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -158,7 +158,7 @@ public class TestCounters {
             assertEquals(0, js.getReduceInputRecords());
             assertEquals(0, js.getReduceOutputRecords());
         }
-            
+
         System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
@@ -183,7 +183,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
@@ -195,7 +195,7 @@ public class TestCounters {
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
 
         cluster.getFileSystem().delete(new Path(file), true);
@@ -208,7 +208,7 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -242,7 +242,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
@@ -253,9 +253,9 @@ public class TestCounters {
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
@@ -266,7 +266,7 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -300,7 +300,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
@@ -311,20 +311,20 @@ public class TestCounters {
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
- 
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
-        
+
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -337,7 +337,7 @@ public class TestCounters {
         System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
-     
+
     @Test
     public void testMapCombineReduceBinStorage() throws IOException, ExecException {
         int count = 0;
@@ -358,20 +358,20 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
 
         ExecJob job = pigServer.store("c", "output", "BinStorage");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
                 pigServer.getPigContext()), pigServer.getPigContext());
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
@@ -379,11 +379,11 @@ public class TestCounters {
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
- 
+
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -399,6 +399,8 @@ public class TestCounters {
 
     @Test
     public void testMultipleMRJobs() throws IOException, ExecException {
+        Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job",
+                Util.isMapredExecType(cluster.getExecType()));
         int count = 0;
         PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
         int [] nos = new int[10];
@@ -413,38 +415,38 @@ public class TestCounters {
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) { 
+        for(int i = 0; i < 10; i++) {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = order a by $0;");
         pigServer.registerQuery("c = group b by $0;");
         pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
         ExecJob job = pigServer.store("d", "output");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
-        
+
         System.out.println("============================================");
         System.out.println("Test case MultipleMRJobs");
         System.out.println("============================================");
-        
+
         JobGraph jp = pigStats.getJobGraph();
-        MRJobStats js = (MRJobStats)jp.getSinks().get(0);
-        
+        JobStats js = (JobStats)jp.getSinks().get(0);
+
         System.out.println("Job id: " + js.getName());
         System.out.println(jp.toString());
-        
+
         System.out.println("Map input records : " + js.getMapInputRecords());
         assertEquals(MAX, js.getMapInputRecords());
         System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -453,12 +455,12 @@ public class TestCounters {
         assertEquals(count, js.getReduceInputRecords());
         System.out.println("Reduce output records : " + js.getReduceOutputRecords());
         assertEquals(count, js.getReduceOutputRecords());
-        
+
         System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
         assertEquals(filesize, js.getHdfsBytesWritten());
 
     }
-    
+
     @Test
     public void testMapOnlyMultiQueryStores() throws Exception {
         PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
@@ -467,8 +469,8 @@ public class TestCounters {
             pw.println(t);
         }
         pw.close();
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file + "';");
@@ -479,22 +481,22 @@ public class TestCounters {
         List<ExecJob> jobs = pigServer.executeBatch();
         PigStats stats = jobs.get(0).getStatistics();
         assertTrue(stats.getOutputLocations().size() == 2);
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-        
+        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
         Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
         for (Long val : entry.values()) {
             counter += val;
         }
-        
-        assertEquals(MAX, counter);       
-    }    
-    
+
+        assertEquals(MAX, counter);
+    }
+
     @Test
     public void testMultiQueryStores() throws Exception {
         int[] nums = new int[100];
@@ -505,13 +507,13 @@ public class TestCounters {
             nums[t]++;
         }
         pw.close();
-        
+
         int groups = 0;
         for (int i : nums) {
             if (i > 0) groups++;
         }
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file + "';");
@@ -525,29 +527,29 @@ public class TestCounters {
         pigServer.registerQuery("store g into '/tmp/outout2';");
         List<ExecJob> jobs = pigServer.executeBatch();
         PigStats stats = jobs.get(0).getStatistics();
-        
+
         assertTrue(stats.getOutputLocations().size() == 2);
-               
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-        
+        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
         Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
         for (Long val : entry.values()) {
             counter += val;
         }
-        
-        assertEquals(groups, counter);       
-    }    
-    
-    /*    
+
+        assertEquals(groups, counter);
+    }
+
+    /*
      * IMPORTANT NOTE:
      * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
      * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
-     */ 
+     */
 //    @Test
 //    public void testLocal() throws IOException, ExecException {
 //        int count = 0;
@@ -566,7 +568,7 @@ public class TestCounters {
 //        }
 //        pw.close();
 //
-//        for(int i = 0; i < 10; i++) 
+//        for(int i = 0; i < 10; i++)
 //            if(nos[i] > 0)
 //                count ++;
 //
@@ -580,56 +582,56 @@ public class TestCounters {
 //        pigServer.registerQuery("c = group b by $0;");
 //        pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
 //        PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
-//        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+//        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs());
 //        long filesize = 0;
 //        while(is.read() != -1) filesize++;
-//        
+//
 //        is.close();
 //        out.delete();
-//        
+//
 //        //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-//        
+//
 //        assertEquals(10, pigStats.getRecordsWritten());
 //        assertEquals(110, pigStats.getBytesWritten());
 //
 //    }
 
     @Test
-    public void testJoinInputCounters() throws Exception {        
+    public void testJoinInputCounters() throws Exception {
         testInputCounters("join");
     }
-    
+
     @Test
-    public void testCogroupInputCounters() throws Exception {        
+    public void testCogroupInputCounters() throws Exception {
         testInputCounters("cogroup");
     }
-    
+
     @Test
-    public void testSkewedInputCounters() throws Exception {        
+    public void testSkewedInputCounters() throws Exception {
         testInputCounters("skewed");
     }
-    
+
     @Test
-    public void testSelfJoinInputCounters() throws Exception {        
+    public void testSelfJoinInputCounters() throws Exception {
         testInputCounters("self-join");
     }
-    
+
     private static boolean multiInputCreated = false;
-    
+
     private static int count = 0;
-            
-    private void testInputCounters(String keyword) throws Exception {  
+
+    private void testInputCounters(String keyword) throws Exception {
         String file1 = "multi-input1.txt";
         String file2 = "multi-input2.txt";
-        
+
         String output = keyword;
-        
+
         if (keyword.equals("self-join")) {
             file2 = file1;
             keyword = "join";
         }
-         
-        final int MAX_NUM_RECORDS = 100; 
+
+        final int MAX_NUM_RECORDS = 100;
         if (!multiInputCreated) {
             PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
             for (int i = 0; i < MAX_NUM_RECORDS; i++) {
@@ -637,7 +639,7 @@ public class TestCounters {
                 pw.println(t);
             }
             pw.close();
-                        
+
             PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
             for (int i = 0; i < MAX_NUM_RECORDS; i++) {
                 int t = r.nextInt(100);
@@ -649,8 +651,8 @@ public class TestCounters {
             pw2.close();
             multiInputCreated = true;
         }
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file1 + "';");
@@ -661,7 +663,7 @@ public class TestCounters {
             pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
         }
         ExecJob job = pigServer.store("c", output + "_output");
-        
+
         PigStats stats = job.getStatistics();
         assertTrue(stats.isSuccessful());
         List<InputStats> inputs = stats.getInputStats();
@@ -680,4 +682,46 @@ public class TestCounters {
             }
         }
     }
+
+    @Test
+    public void testSplitUnionOutputCounters() throws Exception {
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input"));
+        for (int i = 0; i < 10; i++) {
+            pw.println(i);
+        }
+        pw.close();
+        String query =
+                "a = load 'splitunion-input';" +
+                "split a into b if $0 < 5, c otherwise;" +
+                "d = union b, c;";
+
+        pigServer.registerQuery(query);
+
+        ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage");
+        PigStats stats1 = job.getStatistics();
+
+        query =
+                "a = load 'splitunion-input';" +
+                "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" +
+                "e = distinct d;" +
+                "f = union b, c, e;";
+
+        pigServer.registerQuery(query);
+
+        job = pigServer.store("f", "splitunion-output-1", "PigStorage");
+        PigStats stats2 = job.getStatistics();
+
+        PigStats[] pigStats = new PigStats[]{stats1, stats2};
+        for (int i = 0; i < 2; i++) {
+            PigStats stats = pigStats[i];
+            assertTrue(stats.isSuccessful());
+            List<OutputStats> outputs = stats.getOutputStats();
+            assertEquals(1, outputs.size());
+            OutputStats output = outputs.get(0);
+            assertEquals("splitunion-output-" + i, output.getName());
+            assertEquals(10, output.getNumberRecords());
+            assertEquals(20, output.getBytes());
+        }
+    }
 }