You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/09/18 17:43:31 UTC

svn commit: r1524464 - in /pig/trunk: CHANGES.txt src/org/apache/pig/data/InternalDistinctBag.java

Author: cheolsoo
Date: Wed Sep 18 15:43:31 2013
New Revision: 1524464

URL: http://svn.apache.org/r1524464
Log:
PIG-3466: Race Conditions in InternalDistinctBag during proactive spill (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1524464&r1=1524463&r2=1524464&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep 18 15:43:31 2013
@@ -230,6 +230,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3466: Race Conditions in InternalDistinctBag during proactive spill (cheolsoo)
+
 PIG-3464: Mark ExecType and ExecutionEngine interfaces as evolving (cheolsoo)
 
 PIG-3454: Update JsonLoader/JsonStorage (tyro89 via daijy)

Modified: pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=1524464&r1=1524463&r2=1524464&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Wed Sep 18 15:43:31 2013
@@ -49,8 +49,8 @@ import org.apache.pig.classification.Int
  * stored in a HashSet.  When it is time to sort it is placed in an
  * ArrayList and then sorted.  Dispite all these machinations, this was
  * found to be faster than storing it in a TreeSet.
- * 
- * This bag spills pro-actively when the number of tuples in memory 
+ *
+ * This bag spills pro-actively when the number of tuples in memory
  * reaches a limit
  */
 @InterfaceAudience.Private
@@ -58,58 +58,58 @@ import org.apache.pig.classification.Int
 public class InternalDistinctBag extends SortedSpillBag {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 2L;
 
     private static final Log log = LogFactory.getLog(InternalDistinctBag.class);
 
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();
-    
-    private transient boolean mReadStarted = false; 
-    
+
+    private transient boolean mReadStarted = false;
+
     public InternalDistinctBag() {
         this(1, -1.0f);
     }
-    
-    public InternalDistinctBag(int bagCount) {        
-    	this(bagCount, -1.0f);
+
+    public InternalDistinctBag(int bagCount) {
+        this(bagCount, -1.0f);
     }
-    
-    public InternalDistinctBag(int bagCount, float percent) {        
+
+    public InternalDistinctBag(int bagCount, float percent) {
         super(bagCount, percent);
         if (percent < 0) {
-        	percent = 0.2F;            
-        	if (PigMapReduce.sJobConfInternal.get() != null) {
-        		String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
-        		if (usage != null) {
-        			percent = Float.parseFloat(usage);
-        		}
-        	}
+            percent = 0.2F;
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+                if (usage != null) {
+                    percent = Float.parseFloat(usage);
+                }
+            }
         }
-           	
-        init(bagCount, percent);        
+               
+        init(bagCount, percent);
     }
 
     private void init(int bagCount, double percent) {
-    	mContents = new HashSet<Tuple>();      
+        mContents = new HashSet<Tuple>();
     }
-    
+
     @Override
     public boolean isSorted() {
         return false;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return true;
     }
-    
-    
+
+
     @Override
     public long size() {
         if (mSpillFiles != null && mSpillFiles.size() > 0){
-            //We need to racalculate size to guarantee a count of unique 
+            //We need to racalculate size to guarantee a count of unique
             //entries including those on disk
             Iterator<Tuple> iter = iterator();
             int newSize = 0;
@@ -117,44 +117,45 @@ public class InternalDistinctBag extends
                 newSize++;
                 iter.next();
             }
-            
+
             mSize = newSize;
         }
         return mSize;
     }
-    
-    
+
+
     @Override
     public Iterator<Tuple> iterator() {
         return new DistinctDataBagIterator();
     }
 
     @Override
-    public void add(Tuple t) {        
-        
-        if(mReadStarted) {
-            throw new IllegalStateException("InternalDistinctBag is closed for adding new tuples");
-        }
-                
-    	if (mContents.size() > memLimit.getCacheLimit()) {    		
-    		proactive_spill(null);
-    	}
-    	            	
-        if (mContents.add(t)) {
-        	mSize ++;
-                
-        	 // check how many tuples memory can hold by getting average
-            // size of first 100 tuples
-            if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())) {
-                memLimit.addNewObjSize(t.getMemorySize());
-            }          
-        }    	
-        markSpillableIfNecessary();
+    public void add(Tuple t) {
+        synchronized(mContents) {
+            if(mReadStarted) {
+                throw new IllegalStateException("InternalDistinctBag is closed for adding new tuples");
+            }
+
+            if (mContents.size() > memLimit.getCacheLimit()) {
+                proactive_spill(null);
+            }
+
+            if (mContents.add(t)) {
+                mSize ++;
+
+                // check how many tuples memory can hold by getting average
+                // size of first 100 tuples
+                if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())) {
+                    memLimit.addNewObjSize(t.getMemorySize());
+                }
+            }
+            markSpillableIfNecessary();
+        }
     }
 
     /**
      * An iterator that handles getting the next tuple from the bag.
-     * Data can be stored in a combination of in memory and on disk.  
+     * Data can be stored in a combination of in memory and on disk.
      */
     private class DistinctDataBagIterator implements Iterator<Tuple> {
 
@@ -164,22 +165,22 @@ public class InternalDistinctBag extends
 
             @Override
             @SuppressWarnings("unchecked")
-			public int compareTo(TContainer other) {
+            public int compareTo(TContainer other) {
                 return tuple.compareTo(other.tuple);
             }
-            
+
             @Override
             public boolean equals(Object obj) {
-            	if (obj instanceof TContainer) {
-            		return compareTo((TContainer)obj) == 0;
-            	}
-            	
-            	return false;
+                if (obj instanceof TContainer) {
+                    return compareTo((TContainer)obj) == 0;
+                }
+
+                return false;
             }
-            
+
             @Override
             public int hashCode() {
-            	return tuple.hashCode();
+                return tuple.hashCode();
             }
         }
 
@@ -193,21 +194,23 @@ public class InternalDistinctBag extends
         private int mCntr = 0;
 
         @SuppressWarnings("unchecked")
-	DistinctDataBagIterator() {
-            // If this is the first read, we need to sort the data.            
-        	if (!mReadStarted) {
-                preMerge();
-                // We're the first reader, we need to sort the data.
-                // This is in case it gets dumped under us.
-                ArrayList<Tuple> l = new ArrayList<Tuple>(mContents);
-                Collections.sort(l);
-                mContents = l;
-                mReadStarted = true;
-            }            
+        DistinctDataBagIterator() {
+            // If this is the first read, we need to sort the data.
+            synchronized(mContents) {
+                if (!mReadStarted) {
+                    preMerge();
+                    // We're the first reader, we need to sort the data.
+                    // This is in case it gets dumped under us.
+                    ArrayList<Tuple> l = new ArrayList<Tuple>(mContents);
+                    Collections.sort(l);
+                    mContents = l;
+                    mReadStarted = true;
+                }
+            }
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -226,11 +229,11 @@ public class InternalDistinctBag extends
                 return t;
             }
 
-            // Check to see if we just need to read from memory.                       
+            // Check to see if we just need to read from memory.
             if (mSpillFiles == null || mSpillFiles.size() == 0) {
                 return readFromMemory();
             }
-            
+
             // We have spill files, so we need to read the next tuple from
             // one of those files or from memory.
             return readFromTree();
@@ -255,7 +258,7 @@ public class InternalDistinctBag extends
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in = 
+                        DataInputStream in =
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -299,7 +302,7 @@ public class InternalDistinctBag extends
             c.fileNum = fileNum;
 
             if (fileNum == -1) {
-                // Need to read from memory.  
+                // Need to read from memory.
                 do {
                     c.tuple = readFromMemory();
                     if (c.tuple != null) {
@@ -389,7 +392,7 @@ public class InternalDistinctBag extends
             // we'll be removing pieces from the middle and we want to do
             // it efficiently.
             try {
-                
+
                 LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
                 LinkedList<File> filesToDelete = new LinkedList<File>();
                 while (ll.size() > MAX_SPILL_FILES) {
@@ -436,18 +439,18 @@ public class InternalDistinctBag extends
                         throw new RuntimeException(msg, ioe);
                     }
                 }
-                
+
                 // delete files that have been merged into new files
                 for(File f : filesToDelete){
                     if( f.delete() == false){
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-                
+
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();
-                
+
                 // Now, move our new list back to the spill files array.
                 mSpillFiles = new FileList(ll);
             } finally {
@@ -461,7 +464,12 @@ public class InternalDistinctBag extends
 
     @Override
     public long spill(){
-        return proactive_spill(null);
+        synchronized(mContents) {
+            if (this.mReadStarted) {
+                return 0L;
+            }
+            return proactive_spill(null);
+        }
     }
 
 }