You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/19 02:07:37 UTC

svn commit: r986989 - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/data/ src/org/apache/pig/tools/pigstats/

Author: thejas
Date: Thu Aug 19 00:07:36 2010
New Revision: 986989

URL: http://svn.apache.org/viewvc?rev=986989&view=rev
Log:
PIG-1524:'Proactive spill count' is misleading (thejas)

Added:
    hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigCounters.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug 19 00:07:36 2010
@@ -132,6 +132,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1524: 'Proactive spill count' is misleading (thejas)
+
 PIG-1546: Incorrect assert statements in operator evaluation (ajaykidave via
 pradeepkth)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/PigCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigCounters.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigCounters.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigCounters.java Thu Aug 19 00:07:36 2010
@@ -23,5 +23,10 @@ package org.apache.pig;
  */
 public enum PigCounters {
     SPILLABLE_MEMORY_MANAGER_SPILL_COUNT,
-    PROACTIVE_SPILL_COUNT;
+    
+    // total number of bags that have spilled proactively
+    PROACTIVE_SPILL_COUNT_BAGS, 
+    
+    //total number of records that have been spilled to disk
+    PROACTIVE_SPILL_COUNT_RECS;
 }
\ No newline at end of file

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Thu Aug 19 00:07:36 2010
@@ -380,9 +380,13 @@ public abstract class DefaultAbstractBag
     }
     
     protected void incSpillCount(Enum counter) {
+        incSpillCount(counter, 1);
+    }
+
+    protected void incSpillCount(Enum counter, long numRecsSpilled) {
         PigStatusReporter reporter = PigStatusReporter.getInstance();
         if (reporter != null && reporter.getCounter(counter)!=null) {
-            reporter.getCounter(counter).increment(1);
+            reporter.getCounter(counter).increment(numRecsSpilled);
         } else {
             PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter);
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Thu Aug 19 00:07:36 2010
@@ -17,13 +17,22 @@
  */
 package org.apache.pig.data;
 
-import java.io.*;
-import java.util.*;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigCounters;
-import org.apache.pig.PigWarning;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 
 
@@ -37,6 +46,9 @@ public class InternalCachedBag extends D
     private transient DataOutputStream out;
     private transient boolean addDone;
     private transient TupleFactory factory;
+
+    // used to store number of tuples spilled until counter is incremented
+    private transient int numTuplesSpilled = 0; 
  
     public InternalCachedBag() {
         this(1);
@@ -93,19 +105,21 @@ public class InternalCachedBag extends D
                 }
             }
         } else {
+            // above cacheLimit, spill to disk
             try {
                 if(out == null) {
                 	if (log.isDebugEnabled()) {
                 		log.debug("Memory can hold "+ mContents.size() + " records, put the rest in spill file.");
                 	}
                     out = getSpillFile();
-
+                    incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
                 }
                 t.write(out);
                 
-                if (cacheLimit!= 0 && mContents.size() % cacheLimit == 0) {
-                    /* Increment the spill count*/
-                    incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);                    
+                //periodically update number of tuples spilled 
+                numTuplesSpilled++;
+                if(numTuplesSpilled > 1000){
+                    updateSpillRecCounter();
                 }
             }
             catch(IOException e) {
@@ -116,6 +130,11 @@ public class InternalCachedBag extends D
         mSize++;
     }
 
+    private void updateSpillRecCounter() {
+        incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, numTuplesSpilled);
+        numTuplesSpilled = 0;
+    }
+
     public void addAll(DataBag b) {
     	Iterator<Tuple> iter = b.iterator();
     	while(iter.hasNext()) {
@@ -140,6 +159,8 @@ public class InternalCachedBag extends D
             	// ignore
             }
         }
+        if(numTuplesSpilled > 0)
+            updateSpillRecCounter();
         addDone = true;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Aug 19 00:07:36 2010
@@ -54,7 +54,7 @@ import org.apache.pig.backend.hadoop.exe
  * This bag spills pro-actively when the number of tuples in memory 
  * reaches a limit
  */
-public class InternalDistinctBag extends DefaultAbstractBag {
+public class InternalDistinctBag extends SortedSpillBag {
 
     /**
      * 
@@ -70,7 +70,7 @@ public class InternalDistinctBag extends
     private transient int cacheLimit;
     private transient long maxMemUsage;
     private transient long memUsage;    
-
+    
     public InternalDistinctBag() {
         this(1, -1.0);
     }
@@ -145,7 +145,7 @@ public class InternalDistinctBag extends
         }
                 
     	if (mContents.size() > cacheLimit) {    		
-    		spill();
+    		proactive_spill(null);
     	}
     	            	
         if (mContents.add(t)) {
@@ -178,61 +178,8 @@ public class InternalDistinctBag extends
     	}
     }
     
-    public long spill() {       	
-        // Make sure we have something to spill.  Don't create empty
-        // files, as that will make a mess.
-        if (mContents.size() == 0) return 0;
-
-        // Lock the container before I spill, so that iterators aren't
-        // trying to read while I'm mucking with the container.
-        long spilled = 0;
-       
-        DataOutputStream out = null;
-        try {
-            out = getSpillFile();
-        }  catch (IOException ioe) {
-            // Do not remove last file from spilled array. It was not
-            // added as File.createTmpFile threw an IOException
-            warn(
-                "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
-            return 0;
-        }
-        try {            
-        	Tuple[] array = new Tuple[mContents.size()];
-            mContents.toArray(array);
-            Arrays.sort(array);
-            for (int i = 0; i < array.length; i++) {
-                array[i].write(out);
-                spilled++;
-                // This will spill every 16383 records.
-                if ((spilled & 0x3fff) == 0) reportProgress();
-            }
-            
-            out.flush();
-        } catch (IOException ioe) {
-            // Remove the last file from the spilled array, since we failed to
-            // write to it.
-            mSpillFiles.remove(mSpillFiles.size() - 1);
-            warn(
-                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
-            return 0;
-        } finally {
-            if (out != null) {
-                try {
-                    out.close();
-                } catch (IOException e) {
-                    warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
-                }
-            }
-        }
-        mContents.clear();
-        memUsage = 0;
-
-        // Increment the spill count
-        incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
-        return spilled;
-    }
-
+    
+   
     /**
      * An iterator that handles getting the next tuple from the bag.
      * Data can be stored in a combination of in memory and on disk.  
@@ -533,4 +480,9 @@ public class InternalDistinctBag extends
             }
         }
     }
+
+    public long spill(){
+        return proactive_spill(null);
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Thu Aug 19 00:07:36 2010
@@ -18,9 +18,7 @@
 package org.apache.pig.data;
 
 import java.io.BufferedInputStream;
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
@@ -35,12 +33,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.PriorityQueue;
-  
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigCounters;
-import org.apache.pig.PigWarning;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 
 
@@ -56,7 +51,7 @@ import org.apache.pig.backend.hadoop.exe
  * This bag is not registered with SpillableMemoryManager. It calculates
  * the number of tuples to hold in memory and spill pro-actively into files.
  */
-public class InternalSortedBag extends DefaultAbstractBag{
+public class InternalSortedBag extends SortedSpillBag{
 
     /**
      * 
@@ -142,7 +137,7 @@ public class InternalSortedBag extends D
         }
                 
     	if (mContents.size() > cacheLimit) {    		
-    		spill();
+    		proactive_spill(mComp);
     	}
     	        
         mContents.add(t);
@@ -187,61 +182,6 @@ public class InternalSortedBag extends D
         return new SortedDataBagIterator();
     }
 
-    public long spill() {
-        // Make sure we have something to spill.  Don't create empty
-        // files, as that will make a mess.
-        if (mContents.size() == 0) return 0;
-
-        // Lock the container before I spill, so that iterators aren't
-        // trying to read while I'm mucking with the container.
-        long spilled = 0;
-        
-        DataOutputStream out = null;
-        try {
-            out = getSpillFile();
-        } catch (IOException ioe) {
-            // Do not remove last file from spilled array. It was not
-            // added as File.createTmpFile threw an IOException
-            warn(
-                "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
-            return 0;
-        }
-        try {
-            
-            Collections.sort((ArrayList<Tuple>)mContents, mComp);
-            
-            Iterator<Tuple> i = mContents.iterator();
-            while (i.hasNext()) {
-                i.next().write(out);
-                spilled++;
-                // This will spill every 16383 records.
-                if ((spilled & 0x3fff) == 0) reportProgress();
-            }
-            out.flush();
-        } catch (IOException ioe) {
-            // Remove the last file from the spilled array, since we failed to
-            // write to it.
-            mSpillFiles.remove(mSpillFiles.size() - 1);
-            warn(
-                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
-            return 0;
-        } finally {
-            if (out != null) {
-                try {
-                    out.close();
-                } catch (IOException e) {
-                    warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
-                }
-            }
-        }
-        mContents.clear();
-        memUsage = 0;
-        
-        // Increment the spill count
-        incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
-        return spilled;
-    }
-
     /**
      * An iterator that handles getting the next tuple from the bag.  
      * Data can be stored in a combination of in memory and on disk.
@@ -523,4 +463,9 @@ public class InternalSortedBag extends D
             }
         }
     }
+
+    public long spill(){
+        return proactive_spill(mComp);
+    }
+
 }

Added: hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java?rev=986989&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java Thu Aug 19 00:07:36 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.pig.PigCounters;
+import org.apache.pig.PigWarning;
+
+/**
+ * Common functionality for proactively spilling bags that need to keep the data
+ * sorted. 
+ */
+public abstract class SortedSpillBag extends DefaultAbstractBag {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Sort contents of mContents and write them to disk
+     * @param comp Comparator to sort contents of mContents
+     * @return number of tuples spilled
+     */
+    public long proactive_spill(Comparator<Tuple> comp) {
+        // Make sure we have something to spill.  Don't create empty
+        // files, as that will make a mess.
+        if (mContents.size() == 0) return 0;
+
+        //count for number of objects that have spilled
+        if(mSpillFiles == null)
+            incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
+        
+        long spilled = 0;
+        
+        DataOutputStream out = null;
+        try {
+            out = getSpillFile();
+        } catch (IOException ioe) {
+            // Do not remove last file from spilled array. It was not
+            // added as File.createTmpFile threw an IOException
+            warn(
+                "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
+            return 0;
+        }
+        try {
+            //sort the tuples
+            // as per documentation of collection.sort(), it copies to an array,
+            // sorts and copies back to collection
+            // Avoiding that extra copy back to collection (mContents) by 
+            // copying to an array and using Arrays.sort
+            Tuple[] array = new Tuple[mContents.size()];
+            mContents.toArray(array);
+            if(comp == null)
+                Arrays.sort(array);
+            else 
+                Arrays.sort(array,comp);
+
+            //dump the array
+            for (Tuple t : array) {
+                t.write(out);
+                spilled++;
+                // This will spill every 16383 records.
+                if ((spilled & 0x3fff) == 0) reportProgress();
+            }
+
+            out.flush();
+        } catch (IOException ioe) {
+            // Remove the last file from the spilled array, since we failed to
+            // write to it.
+            mSpillFiles.remove(mSpillFiles.size() - 1);
+            warn(
+                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+            return 0;
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+                }
+            }
+        }
+        mContents.clear();
+        
+        incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, spilled);
+        
+        return spilled;
+    }
+    
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Thu Aug 19 00:07:36 2010
@@ -107,7 +107,8 @@ public final class JobStats extends Oper
     private long hdfsBytesWritten = 0;
     private long hdfsBytesRead = 0;
     private long spillCount = 0;
-    private long activeSpillCount = 0;
+    private long activeSpillCountObj = 0;
+    private long activeSpillCountRecs = 0;
     
     private HashMap<String, Long> multiStoreCounters 
             = new HashMap<String, Long>();
@@ -162,7 +163,9 @@ public final class JobStats extends Oper
 
     public long getSMMSpillCount() { return spillCount; }
     
-    public long getProactiveSpillCount() { return activeSpillCount; }
+    public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
+    
+    public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
     
     public long getHdfsBytesWritten() { return hdfsBytesWritten; }
     
@@ -340,9 +343,11 @@ public final class JobStats extends Oper
             spillCount = counters.findCounter(
                     PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)
                     .getCounter();
-            activeSpillCount = counters.findCounter(
-                    PigCounters.PROACTIVE_SPILL_COUNT).getCounter();
-            
+            activeSpillCountObj = counters.findCounter(
+                    PigCounters.PROACTIVE_SPILL_COUNT_BAGS).getCounter();
+            activeSpillCountRecs = counters.findCounter(
+                    PigCounters.PROACTIVE_SPILL_COUNT_RECS).getCounter();
+
             Iterator<Counter> iter = multistoregroup.iterator();
             while (iter.hasNext()) {
                 Counter cter = iter.next();

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Thu Aug 19 00:07:36 2010
@@ -357,13 +357,25 @@ public final class PigStats {
     }
     
     /**
-     * Returns the total spill counts from {@link InternalCachedBag}.
+     * Returns the total number of bags that spilled proactively
      */
-    public long getProactiveSpillCount() {
+    public long getProactiveSpillCountObjects() {
         Iterator<JobStats> it = jobPlan.iterator();
         long ret = 0;
         while (it.hasNext()) {            
-            ret += it.next().getProactiveSpillCount();
+            ret += it.next().getProactiveSpillCountObjects();
+        }
+        return ret;
+    }
+    
+    /**
+     * Returns the total number of records that spilled proactively
+     */
+    public long getProactiveSpillCountRecords() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {            
+            ret += it.next().getProactiveSpillCountRecs();
         }
         return ret;
     }
@@ -587,8 +599,10 @@ public final class PigStats {
         sb.append("Total bytes written : " + getBytesWritten()).append("\n");
         sb.append("Spillable Memory Manager spill count : "
                 + getSMMSpillCount()).append("\n");
-        sb.append("Proactive spill count : " 
-                + getProactiveSpillCount()).append("\n");
+        sb.append("Total bags proactively spilled: " 
+                + getProactiveSpillCountObjects()).append("\n");
+        sb.append("Total records proactively spilled: " 
+                + getProactiveSpillCountRecords()).append("\n");
         
         sb.append("\nJob DAG:\n").append(jobPlan.toString());