You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/09/18 17:43:31 UTC
svn commit: r1524464 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/data/InternalDistinctBag.java
Author: cheolsoo
Date: Wed Sep 18 15:43:31 2013
New Revision: 1524464
URL: http://svn.apache.org/r1524464
Log:
PIG-3466: Race Conditions in InternalDistinctBag during proactive spill (cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1524464&r1=1524463&r2=1524464&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep 18 15:43:31 2013
@@ -230,6 +230,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3466: Race Conditions in InternalDistinctBag during proactive spill (cheolsoo)
+
PIG-3464: Mark ExecType and ExecutionEngine interfaces as evolving (cheolsoo)
PIG-3454: Update JsonLoader/JsonStorage (tyro89 via daijy)
Modified: pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=1524464&r1=1524463&r2=1524464&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Wed Sep 18 15:43:31 2013
@@ -49,8 +49,8 @@ import org.apache.pig.classification.Int
* stored in a HashSet. When it is time to sort it is placed in an
* ArrayList and then sorted. Dispite all these machinations, this was
* found to be faster than storing it in a TreeSet.
- *
- * This bag spills pro-actively when the number of tuples in memory
+ *
+ * This bag spills pro-actively when the number of tuples in memory
* reaches a limit
*/
@InterfaceAudience.Private
@@ -58,58 +58,58 @@ import org.apache.pig.classification.Int
public class InternalDistinctBag extends SortedSpillBag {
/**
- *
+ *
*/
private static final long serialVersionUID = 2L;
private static final Log log = LogFactory.getLog(InternalDistinctBag.class);
private static TupleFactory gTupleFactory = TupleFactory.getInstance();
-
- private transient boolean mReadStarted = false;
-
+
+ private transient boolean mReadStarted = false;
+
public InternalDistinctBag() {
this(1, -1.0f);
}
-
- public InternalDistinctBag(int bagCount) {
- this(bagCount, -1.0f);
+
+ public InternalDistinctBag(int bagCount) {
+ this(bagCount, -1.0f);
}
-
- public InternalDistinctBag(int bagCount, float percent) {
+
+ public InternalDistinctBag(int bagCount, float percent) {
super(bagCount, percent);
if (percent < 0) {
- percent = 0.2F;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
- if (usage != null) {
- percent = Float.parseFloat(usage);
- }
- }
+ percent = 0.2F;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+ if (usage != null) {
+ percent = Float.parseFloat(usage);
+ }
+ }
}
-
- init(bagCount, percent);
+
+ init(bagCount, percent);
}
private void init(int bagCount, double percent) {
- mContents = new HashSet<Tuple>();
+ mContents = new HashSet<Tuple>();
}
-
+
@Override
public boolean isSorted() {
return false;
}
-
+
@Override
public boolean isDistinct() {
return true;
}
-
-
+
+
@Override
public long size() {
if (mSpillFiles != null && mSpillFiles.size() > 0){
- //We need to racalculate size to guarantee a count of unique
+ //We need to racalculate size to guarantee a count of unique
//entries including those on disk
Iterator<Tuple> iter = iterator();
int newSize = 0;
@@ -117,44 +117,45 @@ public class InternalDistinctBag extends
newSize++;
iter.next();
}
-
+
mSize = newSize;
}
return mSize;
}
-
-
+
+
@Override
public Iterator<Tuple> iterator() {
return new DistinctDataBagIterator();
}
@Override
- public void add(Tuple t) {
-
- if(mReadStarted) {
- throw new IllegalStateException("InternalDistinctBag is closed for adding new tuples");
- }
-
- if (mContents.size() > memLimit.getCacheLimit()) {
- proactive_spill(null);
- }
-
- if (mContents.add(t)) {
- mSize ++;
-
- // check how many tuples memory can hold by getting average
- // size of first 100 tuples
- if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())) {
- memLimit.addNewObjSize(t.getMemorySize());
- }
- }
- markSpillableIfNecessary();
+ public void add(Tuple t) {
+ synchronized(mContents) {
+ if(mReadStarted) {
+ throw new IllegalStateException("InternalDistinctBag is closed for adding new tuples");
+ }
+
+ if (mContents.size() > memLimit.getCacheLimit()) {
+ proactive_spill(null);
+ }
+
+ if (mContents.add(t)) {
+ mSize ++;
+
+ // check how many tuples memory can hold by getting average
+ // size of first 100 tuples
+ if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())) {
+ memLimit.addNewObjSize(t.getMemorySize());
+ }
+ }
+ markSpillableIfNecessary();
+ }
}
/**
* An iterator that handles getting the next tuple from the bag.
- * Data can be stored in a combination of in memory and on disk.
+ * Data can be stored in a combination of in memory and on disk.
*/
private class DistinctDataBagIterator implements Iterator<Tuple> {
@@ -164,22 +165,22 @@ public class InternalDistinctBag extends
@Override
@SuppressWarnings("unchecked")
- public int compareTo(TContainer other) {
+ public int compareTo(TContainer other) {
return tuple.compareTo(other.tuple);
}
-
+
@Override
public boolean equals(Object obj) {
- if (obj instanceof TContainer) {
- return compareTo((TContainer)obj) == 0;
- }
-
- return false;
+ if (obj instanceof TContainer) {
+ return compareTo((TContainer)obj) == 0;
+ }
+
+ return false;
}
-
+
@Override
public int hashCode() {
- return tuple.hashCode();
+ return tuple.hashCode();
}
}
@@ -193,21 +194,23 @@ public class InternalDistinctBag extends
private int mCntr = 0;
@SuppressWarnings("unchecked")
- DistinctDataBagIterator() {
- // If this is the first read, we need to sort the data.
- if (!mReadStarted) {
- preMerge();
- // We're the first reader, we need to sort the data.
- // This is in case it gets dumped under us.
- ArrayList<Tuple> l = new ArrayList<Tuple>(mContents);
- Collections.sort(l);
- mContents = l;
- mReadStarted = true;
- }
+ DistinctDataBagIterator() {
+ // If this is the first read, we need to sort the data.
+ synchronized(mContents) {
+ if (!mReadStarted) {
+ preMerge();
+ // We're the first reader, we need to sort the data.
+ // This is in case it gets dumped under us.
+ ArrayList<Tuple> l = new ArrayList<Tuple>(mContents);
+ Collections.sort(l);
+ mContents = l;
+ mReadStarted = true;
+ }
+ }
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() {
// See if we can find a tuple. If so, buffer it.
mBuf = next();
return mBuf != null;
@@ -226,11 +229,11 @@ public class InternalDistinctBag extends
return t;
}
- // Check to see if we just need to read from memory.
+ // Check to see if we just need to read from memory.
if (mSpillFiles == null || mSpillFiles.size() == 0) {
return readFromMemory();
}
-
+
// We have spill files, so we need to read the next tuple from
// one of those files or from memory.
return readFromTree();
@@ -255,7 +258,7 @@ public class InternalDistinctBag extends
Iterator<File> i = mSpillFiles.iterator();
while (i.hasNext()) {
try {
- DataInputStream in =
+ DataInputStream in =
new DataInputStream(new BufferedInputStream(
new FileInputStream(i.next())));
mStreams.add(in);
@@ -299,7 +302,7 @@ public class InternalDistinctBag extends
c.fileNum = fileNum;
if (fileNum == -1) {
- // Need to read from memory.
+ // Need to read from memory.
do {
c.tuple = readFromMemory();
if (c.tuple != null) {
@@ -389,7 +392,7 @@ public class InternalDistinctBag extends
// we'll be removing pieces from the middle and we want to do
// it efficiently.
try {
-
+
LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
LinkedList<File> filesToDelete = new LinkedList<File>();
while (ll.size() > MAX_SPILL_FILES) {
@@ -436,18 +439,18 @@ public class InternalDistinctBag extends
throw new RuntimeException(msg, ioe);
}
}
-
+
// delete files that have been merged into new files
for(File f : filesToDelete){
if( f.delete() == false){
log.warn("Failed to delete spill file: " + f.getPath());
}
}
-
+
// clear the list, so that finalize does not delete any files,
// when mSpillFiles is assigned a new value
mSpillFiles.clear();
-
+
// Now, move our new list back to the spill files array.
mSpillFiles = new FileList(ll);
} finally {
@@ -461,7 +464,12 @@ public class InternalDistinctBag extends
@Override
public long spill(){
- return proactive_spill(null);
+ synchronized(mContents) {
+ if (this.mReadStarted) {
+ return 0L;
+ }
+ return proactive_spill(null);
+ }
}
}