You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/09/27 19:49:49 UTC

svn commit: r1176491 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/

Author: daijy
Date: Tue Sep 27 17:49:49 2011
New Revision: 1176491

URL: http://svn.apache.org/viewvc?rev=1176491&view=rev
Log:
PIG-2208: Restrict number of PIG generated Haddop counters

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 27 17:49:49 2011
@@ -259,6 +259,8 @@ PIG-2221: Couldnt find documentation for
 
 BUG FIXES
 
+PIG-2208: Restrict number of PIG generated Haddop counters (rding via daijy)
+
 PIG-2299: jetty 6.1.14 startup issue causes unit tests to fail in CI (thw via daijy)
 
 PIG-2301: Some more bin/pig, build.xml cleanup for 0.9.1 (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Sep 27 17:49:49 2011
@@ -500,9 +500,8 @@ public class JobControlCompiler{
                     .getTemporaryPath(pigContext).toString();
                     tmpLocation = new Path(tmpLocationStr);
                     conf.set("pig.streaming.log.dir",
-                            new Path(tmpLocation, LOG_DIR).toString());
-                
-            } 
+                            new Path(tmpLocation, LOG_DIR).toString());            
+                } 
                 conf.set("pig.streaming.task.output.dir", outputPathString);
             }
            else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
@@ -513,8 +512,13 @@ public class JobControlCompiler{
 
                 nwJob.setOutputFormatClass(PigOutputFormat.class);
                 
+                boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
+                if (disableCounter) {
+                    log.info("Disable Pig custom output counters");
+                }
                 int idx = 0;
                 for (POStore sto: storeLocations) {
+                    sto.setDisableCounter(disableCounter);
                     sto.setMultiStore(true);
                     sto.setIndex(idx++);
                 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Tue Sep 27 17:49:49 2011
@@ -292,9 +292,16 @@ public class PigInputFormat extends Inpu
         // also passing the multi-input flag to the back-end so that 
         // the multi-input record counters can be created 
         int m = inputs.size();        
+        
+        boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
+        if ((m > 1) && disableCounter) {
+            log.info("Disable Pig custom input counters");
+        }
+        
         for (InputSplit split : splits) {
             ((PigSplit) split).setTotalSplits(n);
             if (m > 1) ((PigSplit) split).setMultiInputs(true);
+            ((PigSplit) split).setDisableCounter(disableCounter);
         }
         
         return splits;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Tue Sep 27 17:49:49 2011
@@ -177,7 +177,7 @@ public class PigRecordReader extends Rec
             loadfunc.prepareToRead(curReader, pigSplit);
         }
                 
-        if (pigSplit.isMultiInputs()) { 
+        if (pigSplit.isMultiInputs() && !pigSplit.disableCounter()) { 
             counterName = getMultiInputsCounerName(pigSplit, inputSpecificConf);
         }
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Tue Sep 27 17:49:49 2011
@@ -86,6 +86,12 @@ public class PigSplit extends InputSplit
     // back-end to track the number of records for each input.
     private boolean isMultiInputs = false;
     
+    // the flag indicates the custom Hadoop counter should be disabled.
+    // This is to prevent the number of counters exceeding the limit.
+    // This flag is controlled by Pig property "pig.disable.counter" (
+    // the default value is 'false').
+    private boolean disableCounter = false;
+    
     /**
      * the job Configuration
      */
@@ -202,6 +208,7 @@ public class PigSplit extends InputSplit
     
     @SuppressWarnings("unchecked")
     public void readFields(DataInput is) throws IOException {
+        disableCounter = is.readBoolean();
         isMultiInputs = is.readBoolean();
         totalSplits = is.readInt();
         splitIndex = is.readInt();
@@ -229,6 +236,7 @@ public class PigSplit extends InputSplit
 
     @SuppressWarnings("unchecked")
     public void write(DataOutput os) throws IOException {
+        os.writeBoolean(disableCounter);
         os.writeBoolean(isMultiInputs);
         os.writeInt(totalSplits);
         os.writeInt(splitIndex);
@@ -374,4 +382,12 @@ public class PigSplit extends InputSplit
         }
         return st.toString();
     }
+
+    public void setDisableCounter(boolean disableCounter) {
+        this.disableCounter = disableCounter;
+    }
+
+    public boolean disableCounter() {
+        return disableCounter;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Tue Sep 27 17:49:49 2011
@@ -66,6 +66,9 @@ public class POStore extends PhysicalOpe
     // flag to distinguish single store from multiquery store.
     private boolean isMultiStore;
     
+    // flag to indicate if the custom counter should be disabled.
+    private boolean disableCounter = false;
+    
     // the index of multiquery store to track counters
     private int index;
     
@@ -101,7 +104,7 @@ public class POStore extends PhysicalOpe
         if (impl != null) {
             try{
                 storer = impl.createStoreFunc(this);
-                if (!isTmpStore && impl instanceof MapReducePOStoreImpl) {
+                if (!isTmpStore && !disableCounter && impl instanceof MapReducePOStoreImpl) {
                     outputRecordCounter = 
                         ((MapReducePOStoreImpl) impl).createRecordCounter(this);
                 }
@@ -282,4 +285,12 @@ public class POStore extends PhysicalOpe
     public int getIndex() {
         return index;
     }
+
+    public void setDisableCounter(boolean disableCounter) {
+        this.disableCounter = disableCounter;
+    }
+
+    public boolean disableCounter() {
+        return disableCounter;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java Tue Sep 27 17:49:49 2011
@@ -96,7 +96,7 @@ public final class InputStats {
                 sb.append("read ");
             }
             
-            if (!local) {
+            if (!local && records >= 0) {
                 sb.append(records).append(" records ");
             } else {
                 sb.append("records ");

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Tue Sep 27 17:49:49 2011
@@ -95,6 +95,8 @@ public final class JobStats extends Oper
     private String errorMsg;
     
     private Exception exception = null;
+    
+    private Boolean disableCounter = false;
             
     @SuppressWarnings("deprecation")
     private JobID jobId;
@@ -270,6 +272,7 @@ public final class JobStats extends Oper
                     .get(JobControlCompiler.PIG_REDUCE_STORES));           
             loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
                     .get("pig.inputs"));
+            disableCounter = conf.getBoolean("pig.disable.counter", false);
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
         }                    
@@ -545,13 +548,17 @@ public final class JobStats extends Oper
     }
     
     private void addOneInputStats(String fileName, int index) {
-        long records = 0;
+        long records = -1;
         Long n = multiInputCounters.get(
                 PigStatsUtil.getMultiInputsCounterName(fileName, index));
         if (n != null) {   
             records = n;
         } else {
-            LOG.warn("unable to get input counter for " + fileName);
+            // the file could be empty
+            if (!disableCounter) records = 0;
+            else {
+                LOG.warn("unable to get input counter for " + fileName);
+            }
         }
         InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));              
         is.setConf(conf);

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java Tue Sep 27 17:49:49 2011
@@ -111,7 +111,7 @@ public final class OutputStats {
         StringBuilder sb = new StringBuilder();
         if (success) {
             sb.append("Successfully stored ");
-            if (!local) {
+            if (!local && records >= 0) {
                 sb.append(records).append(" records ");
             } else {
                 sb.append("records ");

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Tue Sep 27 17:49:49 2011
@@ -738,6 +738,74 @@ public class TestPigRunner {
         }
     }
     
+    @Test // PIG-2208: Restrict number of PIG generated Haddop counters
+    public void testDisablePigCounters() throws Exception {        
+        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
+        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w1.println("B = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w1.println("C = join A by a0, B by a0;");
+        w1.println("store C into '" + OUTPUT_FILE + "';");
+        w1.close();
+        
+        try {
+            String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+     
+            assertTrue(stats.isSuccessful());
+            
+            assertEquals(1, stats.getNumberJobs());
+            List<InputStats> inputs = stats.getInputStats();
+            assertEquals(2, inputs.size());
+            for (InputStats instats : inputs) {
+                // the multi-input counters are disabled
+                assertEquals(-1, instats.getNumberRecords());
+            }
+            
+            List<OutputStats> outputs = stats.getOutputStats();
+            assertEquals(1, outputs.size());
+            OutputStats outstats = outputs.get(0);
+            assertEquals(9, outstats.getNumberRecords());
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+    
+    @Test // PIG-2208: Restrict number of PIG generated Haddop counters
+    public void testDisablePigCounters2() throws Exception {
+        
+        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
+        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w1.println("B = filter A by a0 > 3;");
+        w1.println("store A into 'output';");
+        w1.println("store B into 'tmp/output';");
+        w1.close();
+        
+        try {
+            String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+     
+            assertTrue(stats.isSuccessful());
+            
+            assertEquals(1, stats.getNumberJobs());
+            List<OutputStats> outputs = stats.getOutputStats();
+            assertEquals(2, outputs.size());
+            for (OutputStats outstats : outputs) {
+                // the multi-output counters are disabled
+                assertEquals(-1, outstats.getNumberRecords());
+            }
+            
+            List<InputStats> inputs = stats.getInputStats();
+            assertEquals(1, inputs.size());
+            InputStats instats = inputs.get(0);
+            assertEquals(5, instats.getNumberRecords());
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, "tmp/output");
+        }
+    }
+    
     public static class TestNotificationListener implements PigProgressNotificationListener {
         
         private Map<String, int[]> numMap = new HashMap<String, int[]>();