You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/03 19:43:34 UTC
svn commit: r981984 - in /hadoop/pig/trunk: ./ src/org/apache/pig/data/
Author: thejas
Date: Tue Aug 3 17:43:33 2010
New Revision: 981984
URL: http://svn.apache.org/viewvc?rev=981984&view=rev
Log:
PIG-1516: finalize in bag implementations causes pig to run out of memory in reduce (thejas)
Added:
hadoop/pig/trunk/src/org/apache/pig/data/FileList.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Aug 3 17:43:33 2010
@@ -110,6 +110,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1516: finalize in bag implementations causes pig to run out of memory in reduce (thejas)
+
PIG-1521: explain plan does not show correct Physical operator in MR plan when POSortedDistinct, POPackageLite are used (thejas)
PIG-1513: Pig doesn't handle empty input directory (rding)
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Tue Aug 3 17:43:33 2010
@@ -57,7 +57,7 @@ public abstract class DefaultAbstractBag
protected Collection<Tuple> mContents;
// Spill files we've created. These need to be removed in finalize.
- protected ArrayList<File> mSpillFiles;
+ protected FileList mSpillFiles;
// Total size, including tuples on disk. Stored here so we don't have
// to run through the disk when people ask.
@@ -317,21 +317,6 @@ public abstract class DefaultAbstractBag
}
/**
- * Need to override finalize to clean out the mSpillFiles array.
- */
- @Override
- protected void finalize() {
- if (mSpillFiles != null) {
- for (int i = 0; i < mSpillFiles.size(); i++) {
- boolean res = mSpillFiles.get(i).delete();
- if (!res)
- warn ("DefaultAbstractBag.finalize: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);
-
- }
- }
- }
-
- /**
* Get a file to spill contents to. The file will be registered in the
* mSpillFiles array.
* @return stream to write tuples to.
@@ -339,7 +324,7 @@ public abstract class DefaultAbstractBag
protected DataOutputStream getSpillFile() throws IOException {
if (mSpillFiles == null) {
// We want to keep the list as small as possible.
- mSpillFiles = new ArrayList<File>(1);
+ mSpillFiles = new FileList(1);
}
String tmpDirName= System.getProperties().getProperty("java.io.tmpdir") ;
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Tue Aug 3 17:43:33 2010
@@ -38,7 +38,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigCounters;
import org.apache.pig.PigWarning;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
@@ -492,6 +491,7 @@ public class DistinctDataBag extends Def
// it efficiently.
try {
LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+ LinkedList<File> filesToDelete = new LinkedList<File>();
while (ll.size() > MAX_SPILL_FILES) {
ListIterator<File> i = ll.listIterator();
mStreams =
@@ -500,12 +500,15 @@ public class DistinctDataBag extends Def
for (int j = 0; j < MAX_SPILL_FILES; j++) {
try {
+ File f = i.next();
DataInputStream in =
new DataInputStream(new BufferedInputStream(
- new FileInputStream(i.next())));
+ new FileInputStream(f)));
mStreams.add(in);
addToQueue(null, mStreams.size() - 1);
i.remove();
+ filesToDelete.add(f);
+
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// neer happen.
@@ -534,9 +537,19 @@ public class DistinctDataBag extends Def
throw new RuntimeException(msg, ioe);
}
}
+ // delete files that have been merged into new files
+ for(File f : filesToDelete){
+ if( f.delete() == false){
+ log.warn("Failed to delete spill file: " + f.getPath());
+ }
+ }
+
+ // clear the list, so that finalize does not delete any files,
+ // when mSpillFiles is assigned a new value
+ mSpillFiles.clear();
// Now, move our new list back to the spill files array.
- mSpillFiles = new ArrayList<File>(ll);
+ mSpillFiles = new FileList(ll);
} finally {
// Reset mStreams and mMerge so that they'll be allocated
// properly for regular merging.
Added: hadoop/pig/trunk/src/org/apache/pig/data/FileList.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/FileList.java?rev=981984&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/FileList.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/FileList.java Tue Aug 3 17:43:33 2010
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class extends ArrayList<File> to add a finalize() that
+ * calls delete on the files .
+ * This helps in getting rid of the finalize() in the classes such
+ * as DefaultAbstractBag, and they can be freed up without waiting
+ * for finalize to be called. Only if those classes have spilled to
+ * disk, there will be a (this) class that needs to be finalized.
+ *
+ * CAUTION: if you assign a new value for a variable of this type,
+ * the files (if any) in the old object it pointed to will be scheduled for
+ * deletion. To avoid that call .clear() before assigning a new value.
+ */
+public class FileList extends ArrayList<File> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log log = LogFactory.getLog(FileList.class);
+
+ public FileList(int i) {
+ super(i);
+ }
+
+ public FileList(){
+ }
+
+ public FileList(LinkedList<File> ll) {
+ super(ll);
+ }
+
+ @Override
+ protected void finalize(){
+ for(File f : this){
+ if(f.delete() == false){
+ log.warn("Failed to delete file: " + f.getPath());
+ }
+ }
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Tue Aug 3 17:43:33 2010
@@ -23,20 +23,20 @@ import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigCounters;
+import org.apache.pig.PigWarning;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
public class InternalCachedBag extends DefaultAbstractBag {
- private static final long serialVersionUID = 1L;
-
- private static final Log log = LogFactory.getLog(InternalCachedBag.class);
+ private static final long serialVersionUID = 1L;
+
+ private static final Log log = LogFactory.getLog(InternalCachedBag.class);
private transient int cacheLimit;
private transient long maxMemUsage;
private transient long memUsage;
private transient DataOutputStream out;
private transient boolean addDone;
private transient TupleFactory factory;
-
public InternalCachedBag() {
this(1);
@@ -145,20 +145,12 @@ public class InternalCachedBag extends D
public void clear() {
if (!addDone) {
- addDone();
+ addDone();
}
super.clear();
addDone = false;
out = null;
}
-
- protected void finalize() {
- if (!addDone) {
- // close the spill file so it can be deleted
- addDone();
- }
- super.finalize();
- }
public boolean isDistinct() {
return false;
@@ -173,8 +165,8 @@ public class InternalCachedBag extends D
// close the spill file and mark adding is done
// so further adding is disallowed.
addDone();
- }
- return new CachedBagIterator();
+ }
+ return new CachedBagIterator();
}
public long spill()
@@ -202,11 +194,12 @@ public class InternalCachedBag extends D
}
+
public boolean hasNext() {
- if (next != null) {
- return true;
- }
-
+ if (next != null) {
+ return true;
+ }
+
if(iter.hasNext()){
next = iter.next();
return true;
@@ -236,32 +229,21 @@ public class InternalCachedBag extends D
}
public Tuple next() {
- if (next == null) {
- if (!hasNext()) {
- throw new IllegalStateException("No more elements from iterator");
- }
- }
- Tuple t = next;
- next = null;
-
- return t;
+ if (next == null) {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more elements from iterator");
+ }
+ }
+ Tuple t = next;
+ next = null;
+
+ return t;
}
public void remove() {
throw new UnsupportedOperationException("remove is not supported for CachedBagIterator");
}
- protected void finalize() {
- if(in != null) {
- try
- {
- in.close();
- }
- catch(Exception e) {
-
- }
- }
- }
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Tue Aug 3 17:43:33 2010
@@ -464,7 +464,9 @@ public class InternalDistinctBag extends
// we'll be removing pieces from the middle and we want to do
// it efficiently.
try {
+
LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+ LinkedList<File> filesToDelete = new LinkedList<File>();
while (ll.size() > MAX_SPILL_FILES) {
ListIterator<File> i = ll.listIterator();
mStreams =
@@ -473,12 +475,14 @@ public class InternalDistinctBag extends
for (int j = 0; j < MAX_SPILL_FILES; j++) {
try {
+ File f = i.next();
DataInputStream in =
new DataInputStream(new BufferedInputStream(
- new FileInputStream(i.next())));
+ new FileInputStream(f)));
mStreams.add(in);
addToQueue(null, mStreams.size() - 1);
i.remove();
+ filesToDelete.add(f);
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// neer happen.
@@ -507,9 +511,20 @@ public class InternalDistinctBag extends
throw new RuntimeException(msg, ioe);
}
}
-
+
+ // delete files that have been merged into new files
+ for(File f : filesToDelete){
+ if( f.delete() == false){
+ log.warn("Failed to delete spill file: " + f.getPath());
+ }
+ }
+
+ // clear the list, so that finalize does not delete any files,
+ // when mSpillFiles is assigned a new value
+ mSpillFiles.clear();
+
// Now, move our new list back to the spill files array.
- mSpillFiles = new ArrayList<File>(ll);
+ mSpillFiles = new FileList(ll);
} finally {
// Reset mStreams and mMerge so that they'll be allocated
// properly for regular merging.
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Tue Aug 3 17:43:33 2010
@@ -456,6 +456,7 @@ public class InternalSortedBag extends D
// it efficiently.
try {
LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+ LinkedList<File> filesToDelete = new LinkedList<File>();
while (ll.size() > MAX_SPILL_FILES) {
ListIterator<File> i = ll.listIterator();
mStreams =
@@ -464,12 +465,15 @@ public class InternalSortedBag extends D
for (int j = 0; j < MAX_SPILL_FILES; j++) {
try {
+ File f = i.next();
DataInputStream in =
new DataInputStream(new BufferedInputStream(
- new FileInputStream(i.next())));
+ new FileInputStream(f)));
mStreams.add(in);
addToQueue(null, mStreams.size() - 1);
i.remove();
+ filesToDelete.add(f);
+
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// neer happen.
@@ -498,9 +502,19 @@ public class InternalSortedBag extends D
throw new RuntimeException(msg, ioe);
}
}
+ // delete files that have been merged into new files
+ for(File f : filesToDelete){
+ if( f.delete() == false){
+ log.warn("Failed to delete spill file: " + f.getPath());
+ }
+ }
+
+ // clear the list, so that finalize does not delete any files,
+ // when mSpillFiles is assigned a new value
+ mSpillFiles.clear();
// Now, move our new list back to the spill files array.
- mSpillFiles = new ArrayList<File>(ll);
+ mSpillFiles = new FileList(ll);
} finally {
// Reset mStreams and mMerge so that they'll be allocated
// properly for regular merging.
Modified: hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=981984&r1=981983&r2=981984&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Tue Aug 3 17:43:33 2010
@@ -450,6 +450,7 @@ public class SortedDataBag extends Defau
// it efficiently.
try {
LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+ LinkedList<File> filesToDelete = new LinkedList<File>();
while (ll.size() > MAX_SPILL_FILES) {
ListIterator<File> i = ll.listIterator();
mStreams =
@@ -458,12 +459,14 @@ public class SortedDataBag extends Defau
for (int j = 0; j < MAX_SPILL_FILES; j++) {
try {
+ File f = i.next();
DataInputStream in =
new DataInputStream(new BufferedInputStream(
- new FileInputStream(i.next())));
+ new FileInputStream(f)));
mStreams.add(in);
addToQueue(null, mStreams.size() - 1);
i.remove();
+ filesToDelete.add(f);
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// neer happen.
@@ -492,9 +495,20 @@ public class SortedDataBag extends Defau
throw new RuntimeException(msg, ioe);
}
}
+ // delete files that have been merged into new files
+ for(File f : filesToDelete){
+ if( f.delete() == false){
+ log.warn("Failed to delete spill file: " + f.getPath());
+ }
+ }
+
+ // clear the list, so that finalize does not delete any files,
+ // when mSpillFiles is assigned a new value
+ mSpillFiles.clear();
// Now, move our new list back to the spill files array.
- mSpillFiles = new ArrayList<File>(ll);
+ mSpillFiles = new FileList(ll);
+
} finally {
// Reset mStreams and mMerge so that they'll be allocated
// properly for regular merging.