You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/03 19:43:34 UTC

svn commit: r981984 - in /hadoop/pig/trunk: ./ src/org/apache/pig/data/

Author: thejas
Date: Tue Aug  3 17:43:33 2010
New Revision: 981984

URL: http://svn.apache.org/viewvc?rev=981984&view=rev
Log:
PIG-1516: finalize in bag implementations causes pig to run out of memory in reduce (thejas)

Added:
    hadoop/pig/trunk/src/org/apache/pig/data/FileList.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Aug  3 17:43:33 2010
@@ -110,6 +110,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1516: finalize in bag implementations causes pig to run out of memory in reduce (thejas)
+
 PIG-1521: explain plan does not show correct Physical operator in MR plan when POSortedDistinct, POPackageLite are used (thejas)
 
 PIG-1513: Pig doesn't handle empty input directory (rding)

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Tue Aug  3 17:43:33 2010
@@ -57,7 +57,7 @@ public abstract class DefaultAbstractBag
     protected Collection<Tuple> mContents;
 
     // Spill files we've created.  These need to be removed in finalize.
-    protected ArrayList<File> mSpillFiles;
+    protected FileList mSpillFiles;
 
     // Total size, including tuples on disk.  Stored here so we don't have
     // to run through the disk when people ask.
@@ -317,21 +317,6 @@ public abstract class DefaultAbstractBag
     }
 
     /**
-     * Need to override finalize to clean out the mSpillFiles array.
-     */
-    @Override
-    protected void finalize() {
-        if (mSpillFiles != null) {
-            for (int i = 0; i < mSpillFiles.size(); i++) {
-                boolean res = mSpillFiles.get(i).delete();
-                if (!res)
-                    warn ("DefaultAbstractBag.finalize: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);
-                    
-            }
-        }
-    }
-
-    /**
      * Get a file to spill contents to.  The file will be registered in the
      * mSpillFiles array.
      * @return stream to write tuples to.
@@ -339,7 +324,7 @@ public abstract class DefaultAbstractBag
     protected DataOutputStream getSpillFile() throws IOException {
         if (mSpillFiles == null) {
             // We want to keep the list as small as possible.
-            mSpillFiles = new ArrayList<File>(1);
+            mSpillFiles = new FileList(1);
         }
 
         String tmpDirName= System.getProperties().getProperty("java.io.tmpdir") ;                

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Tue Aug  3 17:43:33 2010
@@ -38,7 +38,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 
 
 
@@ -492,6 +491,7 @@ public class DistinctDataBag extends Def
             // it efficiently.
             try {
                 LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+                LinkedList<File> filesToDelete = new LinkedList<File>();
                 while (ll.size() > MAX_SPILL_FILES) {
                     ListIterator<File> i = ll.listIterator();
                     mStreams =
@@ -500,12 +500,15 @@ public class DistinctDataBag extends Def
 
                     for (int j = 0; j < MAX_SPILL_FILES; j++) {
                         try {
+                            File f = i.next();
                             DataInputStream in =
                                 new DataInputStream(new BufferedInputStream(
-                                    new FileInputStream(i.next())));
+                                    new FileInputStream(f)));
                             mStreams.add(in);
                             addToQueue(null, mStreams.size() - 1);
                             i.remove();
+                            filesToDelete.add(f);
+                            
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
@@ -534,9 +537,19 @@ public class DistinctDataBag extends Def
                         throw new RuntimeException(msg, ioe);
                     }
                 }
+                // delete files that have been merged into new files
+                for(File f : filesToDelete){
+                    if( f.delete() == false){
+                        log.warn("Failed to delete spill file: " + f.getPath());
+                    }
+                }
+                
+                // clear the list, so that finalize does not delete any files,
+                // when mSpillFiles is assigned a new value
+                mSpillFiles.clear();
 
                 // Now, move our new list back to the spill files array.
-                mSpillFiles = new ArrayList<File>(ll);
+                mSpillFiles = new FileList(ll);
             } finally {
                 // Reset mStreams and mMerge so that they'll be allocated
                 // properly for regular merging.

Added: hadoop/pig/trunk/src/org/apache/pig/data/FileList.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/FileList.java?rev=981984&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/FileList.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/FileList.java Tue Aug  3 17:43:33 2010
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class extends ArrayList<File> to add a finalize() that
+ * calls delete on the files .
+ * This helps in getting rid of the finalize() in the classes such 
+ * as DefaultAbstractBag, and they can be freed up without waiting 
+ * for finalize to be called. Only if those classes have spilled to
+ * disk, there will be a (this) class that needs to be finalized.
+ * 
+ * CAUTION: if you assign a new value for a variable of this type,
+ * the files (if any) in the old object it pointed to will be scheduled for
+ * deletion. To avoid that call .clear() before assigning a new value.
+ */
+public class FileList extends ArrayList<File> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log log = LogFactory.getLog(FileList.class);
+
+    public FileList(int i) {
+        super(i);
+    }
+
+    public FileList(){
+    }
+    
+    public FileList(LinkedList<File> ll) {
+        super(ll);
+    }
+
+    @Override
+    protected void finalize(){
+        for(File f : this){
+            if(f.delete() == false){
+                log.warn("Failed to delete file: " + f.getPath());
+            }
+        }
+    }
+    
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Tue Aug  3 17:43:33 2010
@@ -23,20 +23,20 @@ import java.util.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigCounters;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 
 
 public class InternalCachedBag extends DefaultAbstractBag {
-	private static final long serialVersionUID = 1L;
-	
-	private static final Log log = LogFactory.getLog(InternalCachedBag.class);
+    private static final long serialVersionUID = 1L;
+
+    private static final Log log = LogFactory.getLog(InternalCachedBag.class);
     private transient int cacheLimit;
     private transient long maxMemUsage;
     private transient long memUsage;
     private transient DataOutputStream out;
     private transient boolean addDone;
     private transient TupleFactory factory;
-
  
     public InternalCachedBag() {
         this(1);
@@ -145,20 +145,12 @@ public class InternalCachedBag extends D
 
     public void clear() {
     	if (!addDone) {
-    		addDone();
+    	    addDone();
     	}
         super.clear();
         addDone = false;
         out = null;
     }
-
-    protected void finalize() {
-    	if (!addDone) {
-    		// close the spill file so it can be deleted
-    		addDone();
-    	}
-    	super.finalize();
-    }
     
     public boolean isDistinct() {
         return false;
@@ -173,8 +165,8 @@ public class InternalCachedBag extends D
     		// close the spill file and mark adding is done
     		// so further adding is disallowed.
     		addDone();
-        }        
-        return new CachedBagIterator();
+        }
+    	return new CachedBagIterator();
     }
 
     public long spill()
@@ -202,11 +194,12 @@ public class InternalCachedBag extends D
         }
 
 
+
         public boolean hasNext() {
-        	if (next != null) {
-        		return true;        		
-        	}
-        	
+            if (next != null) {
+                return true;        		
+            }
+
             if(iter.hasNext()){
                 next = iter.next();
                 return true;
@@ -236,32 +229,21 @@ public class InternalCachedBag extends D
         }
 
         public Tuple next() {  
-        	if (next == null) {
-        		if (!hasNext()) {
-        			throw new IllegalStateException("No more elements from iterator");
-        		}
-        	}
-        	Tuple t = next;
-        	next = null;
-        	
-        	return t;
+            if (next == null) {
+                if (!hasNext()) {
+                    throw new NoSuchElementException("No more elements from iterator");
+                }
+            }
+            Tuple t = next;
+            next = null;
+
+            return t;
         }
 
         public void remove() {
         	throw new UnsupportedOperationException("remove is not supported for CachedBagIterator");
         }
 
-        protected void finalize() {
-            if(in != null) {
-                try
-                {
-                    in.close();
-                }
-                catch(Exception e) { 
-                	
-                }
-            }
-        }
     }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Tue Aug  3 17:43:33 2010
@@ -464,7 +464,9 @@ public class InternalDistinctBag extends
             // we'll be removing pieces from the middle and we want to do
             // it efficiently.
             try {
+                
                 LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+                LinkedList<File> filesToDelete = new LinkedList<File>();
                 while (ll.size() > MAX_SPILL_FILES) {
                     ListIterator<File> i = ll.listIterator();
                     mStreams =
@@ -473,12 +475,14 @@ public class InternalDistinctBag extends
 
                     for (int j = 0; j < MAX_SPILL_FILES; j++) {
                         try {
+                            File f = i.next();
                             DataInputStream in =
                                 new DataInputStream(new BufferedInputStream(
-                                    new FileInputStream(i.next())));
+                                    new FileInputStream(f)));
                             mStreams.add(in);
                             addToQueue(null, mStreams.size() - 1);
                             i.remove();
+                            filesToDelete.add(f);
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
@@ -507,9 +511,20 @@ public class InternalDistinctBag extends
                         throw new RuntimeException(msg, ioe);
                     }
                 }
-
+                
+                // delete files that have been merged into new files
+                for(File f : filesToDelete){
+                    if( f.delete() == false){
+                        log.warn("Failed to delete spill file: " + f.getPath());
+                    }
+                }
+                
+                // clear the list, so that finalize does not delete any files,
+                // when mSpillFiles is assigned a new value
+                mSpillFiles.clear();
+                
                 // Now, move our new list back to the spill files array.
-                mSpillFiles = new ArrayList<File>(ll);
+                mSpillFiles = new FileList(ll);
             } finally {
                 // Reset mStreams and mMerge so that they'll be allocated
                 // properly for regular merging.

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Tue Aug  3 17:43:33 2010
@@ -456,6 +456,7 @@ public class InternalSortedBag extends D
             // it efficiently.
             try {
                 LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+                LinkedList<File> filesToDelete = new LinkedList<File>();
                 while (ll.size() > MAX_SPILL_FILES) {
                     ListIterator<File> i = ll.listIterator();
                     mStreams =
@@ -464,12 +465,15 @@ public class InternalSortedBag extends D
 
                     for (int j = 0; j < MAX_SPILL_FILES; j++) {
                         try {
+                            File f = i.next();
                             DataInputStream in =
                                 new DataInputStream(new BufferedInputStream(
-                                    new FileInputStream(i.next())));
+                                    new FileInputStream(f)));
                             mStreams.add(in);
                             addToQueue(null, mStreams.size() - 1);
                             i.remove();
+                            filesToDelete.add(f);
+                            
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
@@ -498,9 +502,19 @@ public class InternalSortedBag extends D
                         throw new RuntimeException(msg, ioe);
                     }
                 }
+                // delete files that have been merged into new files
+                for(File f : filesToDelete){
+                    if( f.delete() == false){
+                        log.warn("Failed to delete spill file: " + f.getPath());
+                    }
+                }
+                
+                // clear the list, so that finalize does not delete any files,
+                // when mSpillFiles is assigned a new value
+                mSpillFiles.clear();
 
                 // Now, move our new list back to the spill files array.
-                mSpillFiles = new ArrayList<File>(ll);
+                mSpillFiles = new FileList(ll);
             } finally {
                 // Reset mStreams and mMerge so that they'll be allocated
                 // properly for regular merging.

Modified: hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Tue Aug  3 17:43:33 2010
@@ -450,6 +450,7 @@ public class SortedDataBag extends Defau
             // it efficiently.
             try {
                 LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+                LinkedList<File> filesToDelete = new LinkedList<File>();
                 while (ll.size() > MAX_SPILL_FILES) {
                     ListIterator<File> i = ll.listIterator();
                     mStreams =
@@ -458,12 +459,14 @@ public class SortedDataBag extends Defau
 
                     for (int j = 0; j < MAX_SPILL_FILES; j++) {
                         try {
+                            File f = i.next();
                             DataInputStream in =
                                 new DataInputStream(new BufferedInputStream(
-                                    new FileInputStream(i.next())));
+                                    new FileInputStream(f)));
                             mStreams.add(in);
                             addToQueue(null, mStreams.size() - 1);
                             i.remove();
+                            filesToDelete.add(f);
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
@@ -492,9 +495,20 @@ public class SortedDataBag extends Defau
                         throw new RuntimeException(msg, ioe);
                     }
                 }
+                // delete files that have been merged into new files
+                for(File f : filesToDelete){
+                    if( f.delete() == false){
+                        log.warn("Failed to delete spill file: " + f.getPath());
+                    }
+                }
+                
+                // clear the list, so that finalize does not delete any files,
+                // when mSpillFiles is assigned a new value
+                mSpillFiles.clear();
 
                 // Now, move our new list back to the spill files array.
-                mSpillFiles = new ArrayList<File>(ll);
+                mSpillFiles = new FileList(ll);
+
             } finally {
                 // Reset mStreams and mMerge so that they'll be allocated
                 // properly for regular merging.