You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/18 15:39:31 UTC

svn commit: r1779347 - in /pig/branches/branch-0.16: ./ src/org/apache/pig/data/ test/org/apache/pig/test/

Author: rohini
Date: Wed Jan 18 15:39:31 2017
New Revision: 1779347

URL: http://svn.apache.org/viewvc?rev=1779347&view=rev
Log:
PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)

Modified:
    pig/branches/branch-0.16/CHANGES.txt
    pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java
    pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java
    pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java
    pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java
    pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java
    pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Jan 18 15:39:31 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)
+
 PIG-4918: Pig on Tez cannot switch pig.temp.dir to another fs (daijy)
 
 PIG-5078: Script fails with error - POStoreTez only accepts MROutput (rohini)

Modified: pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java Wed Jan 18 15:39:31 2017
@@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag
     }
 
     @SuppressWarnings("rawtypes")
-    protected void warn(String msg, Enum warningEnum, Exception e) {
+    protected void warn(String msg, Enum warningEnum, Throwable e) {
         pigLogger = PhysicalOperator.getPigLogger();
         if(pigLogger != null) {
             pigLogger.warn(this, msg, warningEnum);

Modified: pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java Wed Jan 18 15:39:31 2017
@@ -22,11 +22,11 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.io.FileNotFoundException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,12 +42,12 @@ import org.apache.pig.PigWarning;
 public class DefaultDataBag extends DefaultAbstractBag {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 2L;
 
     private static final Log log = LogFactory.getLog(DefaultDataBag.class);
-    
+
     private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance();
 
     public DefaultDataBag() {
@@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa
     public boolean isSorted() {
         return false;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return false;
     }
-    
+
     @Override
     public Iterator<Tuple> iterator() {
         return new DefaultDataBagIterator();
@@ -110,12 +110,15 @@ public class DefaultDataBag extends Defa
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -126,7 +129,6 @@ public class DefaultDataBag extends Defa
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -156,7 +158,7 @@ public class DefaultDataBag extends Defa
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple
             if (hasCachedTuple)
                 return (mBuf != null);
@@ -209,7 +211,7 @@ public class DefaultDataBag extends Defa
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    String msg = "Unable to find our spill file."; 
+                    String msg = "Unable to find our spill file.";
                     log.fatal(msg, fnfe);
                     throw new RuntimeException(msg, fnfe);
                 }
@@ -223,7 +225,7 @@ public class DefaultDataBag extends Defa
                         log.fatal(msg, eof);
                         throw new RuntimeException(msg, eof);
                     } catch (IOException ioe) {
-                        String msg = "Unable to read our spill file."; 
+                        String msg = "Unable to read our spill file.";
                         log.fatal(msg, ioe);
                         throw new RuntimeException(msg, ioe);
                     }
@@ -259,7 +261,7 @@ public class DefaultDataBag extends Defa
                         log.warn("Failed to close spill file.", e);
                     }
                 } catch (IOException ioe) {
-                    String msg = "Unable to read our spill file."; 
+                    String msg = "Unable to read our spill file.";
                     log.fatal(msg, ioe);
                     throw new RuntimeException(msg, ioe);
                 }

Modified: pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java Wed Jan 18 15:39:31 2017
@@ -67,17 +67,17 @@ public class DistinctDataBag extends Def
     public boolean isSorted() {
         return false;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return true;
     }
-    
-    
+
+
     @Override
     public long size() {
         if (mSpillFiles != null && mSpillFiles.size() > 0){
-            //We need to racalculate size to guarantee a count of unique 
+            //We need to racalculate size to guarantee a count of unique
             //entries including those on disk
             Iterator<Tuple> iter = iterator();
             int newSize = 0;
@@ -85,7 +85,7 @@ public class DistinctDataBag extends Def
                 newSize++;
                 iter.next();
             }
-            
+
             synchronized(mContents) {
                 //we don't want adds to change our numbers
                 //the lock may need to cover more of the method
@@ -94,8 +94,8 @@ public class DistinctDataBag extends Def
         }
         return mSize;
     }
-    
-    
+
+
     @Override
     public Iterator<Tuple> iterator() {
         return new DistinctDataBagIterator();
@@ -155,12 +155,15 @@ public class DistinctDataBag extends Def
                     }
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -171,7 +174,6 @@ public class DistinctDataBag extends Def
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -208,7 +210,7 @@ public class DistinctDataBag extends Def
 
             @Override
             public int hashCode() {
-                return tuple.hashCode(); 
+                return tuple.hashCode();
             }
         }
 
@@ -237,7 +239,7 @@ public class DistinctDataBag extends Def
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -295,7 +297,7 @@ public class DistinctDataBag extends Def
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    String msg = "Unable to find our spill file."; 
+                    String msg = "Unable to find our spill file.";
                     log.fatal(msg, fnfe);
                     throw new RuntimeException(msg, fnfe);
                 }
@@ -346,7 +348,7 @@ public class DistinctDataBag extends Def
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in = 
+                        DataInputStream in =
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -502,7 +504,7 @@ public class DistinctDataBag extends Def
                             addToQueue(null, mStreams.size() - 1);
                             i.remove();
                             filesToDelete.add(f);
-                            
+
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
@@ -545,7 +547,7 @@ public class DistinctDataBag extends Def
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-                
+
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();
@@ -560,6 +562,6 @@ public class DistinctDataBag extends Def
             }
         }
     }
-    
+
 }
 

Modified: pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java Wed Jan 18 15:39:31 2017
@@ -32,7 +32,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.PriorityQueue;
-  
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigCounters;
@@ -44,14 +44,14 @@ import org.apache.pig.PigWarning;
  * stored unsorted as it comes in, and only sorted when it is time to dump
  * it to a file or when the first iterator is requested.  Experementation
  * found this to be the faster than storing it sorted to begin with.
- * 
+ *
  * We allow a user defined comparator, but provide a default comparator in
  * cases where the user doesn't specify one.
  */
 public class SortedDataBag extends DefaultAbstractBag{
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 2L;
 
@@ -76,7 +76,7 @@ public class SortedDataBag extends Defau
 
         @Override
         public int hashCode() {
-            return 42; 
+            return 42;
         }
 
     }
@@ -95,12 +95,12 @@ public class SortedDataBag extends Defau
     public boolean isSorted() {
         return true;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return false;
     }
-    
+
     @Override
     public Iterator<Tuple> iterator() {
         return new SortedDataBagIterator();
@@ -145,12 +145,15 @@ public class SortedDataBag extends Defau
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -161,7 +164,6 @@ public class SortedDataBag extends Defau
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -203,7 +205,7 @@ public class SortedDataBag extends Defau
 
             @Override
             public int hashCode() {
-                return tuple.hashCode(); 
+                return tuple.hashCode();
             }
         }
 
@@ -228,7 +230,7 @@ public class SortedDataBag extends Defau
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -341,7 +343,7 @@ public class SortedDataBag extends Defau
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in = 
+                        DataInputStream in =
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -351,7 +353,7 @@ public class SortedDataBag extends Defau
                     } catch (FileNotFoundException fnfe) {
                         // We can't find our own spill file?  That should
                         // never happen.
-                        String msg = "Unable to find our spill file."; 
+                        String msg = "Unable to find our spill file.";
                         log.fatal(msg, fnfe);
                         throw new RuntimeException(msg, fnfe);
                     }
@@ -411,7 +413,7 @@ public class SortedDataBag extends Defau
                         in.close();
                     }catch(IOException e) {
                         log.warn("Failed to close spill file.", e);
-                    }                	
+                    }
                     mStreams.set(fileNum, null);
                 } catch (IOException ioe) {
                     String msg = "Unable to find our spill file.";
@@ -518,7 +520,7 @@ public class SortedDataBag extends Defau
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-                
+
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();

Modified: pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java Wed Jan 18 15:39:31 2017
@@ -29,7 +29,7 @@ import org.apache.pig.classification.Int
 
 /**
  * Common functionality for proactively spilling bags that need to keep the data
- * sorted. 
+ * sorted.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -54,9 +54,9 @@ public abstract class SortedSpillBag ext
         //count for number of objects that have spilled
         if(mSpillFiles == null)
             incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
-        
+
         long spilled = 0;
-        
+
         DataOutputStream out = null;
         try {
             out = getSpillFile();
@@ -71,13 +71,13 @@ public abstract class SortedSpillBag ext
             //sort the tuples
             // as per documentation of collection.sort(), it copies to an array,
             // sorts and copies back to collection
-            // Avoiding that extra copy back to collection (mContents) by 
+            // Avoiding that extra copy back to collection (mContents) by
             // copying to an array and using Arrays.sort
             Tuple[] array = new Tuple[mContents.size()];
             mContents.toArray(array);
             if(comp == null)
                 Arrays.sort(array);
-            else 
+            else
                 Arrays.sort(array,comp);
 
             //dump the array
@@ -89,12 +89,15 @@ public abstract class SortedSpillBag ext
             }
 
             out.flush();
-        } catch (IOException ioe) {
+            out.close();
+            out = null;
+            mContents.clear();
+        } catch (Throwable e) {
             // Remove the last file from the spilled array, since we failed to
             // write to it.
             mSpillFiles.remove(mSpillFiles.size() - 1);
             warn(
-                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
             return 0;
         } finally {
             if (out != null) {
@@ -105,11 +108,9 @@ public abstract class SortedSpillBag ext
                 }
             }
         }
-        mContents.clear();
-        
         incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, spilled);
-        
+
         return spilled;
     }
-    
+
 }

Modified: pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java Wed Jan 18 15:39:31 2017
@@ -17,17 +17,36 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-import java.util.*;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
-
-
-import org.apache.pig.data.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.DistinctDataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.SingleTupleBag;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.Spillable;
 import org.junit.After;
 import org.junit.Test;
@@ -36,7 +55,7 @@ import org.junit.Test;
 /**
  * This class will exercise the basic Pig data model and members. It tests for proper behavior in
  * assignment and comparison, as well as function application.
- * 
+ *
  * @author dnm
  */
 public class TestDataBag  {
@@ -590,7 +609,7 @@ public class TestDataBag  {
             }
             mgr.forceSpill();
         }
-        
+
        assertEquals("Size of distinct data bag is incorrect", rightAnswer.size(), b.size());
 
         // Read tuples back, hopefully they come out in the same order.
@@ -719,14 +738,14 @@ public class TestDataBag  {
     @Test
     public void testDefaultBagFactory() throws Exception {
         BagFactory f = BagFactory.getInstance();
-       
+
         DataBag bag = f.newDefaultBag();
         DataBag sorted = f.newSortedBag(null);
         DataBag distinct = f.newDistinctBag();
 
         assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
         assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
-        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));         
+        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
     }
 
     @Test
@@ -756,7 +775,7 @@ public class TestDataBag  {
         try {
             BagFactory f = BagFactory.getInstance();
         } catch (RuntimeException re) {
-            assertEquals("Expected does not extend BagFactory message", 
+            assertEquals("Expected does not extend BagFactory message",
                 "Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!",
                 re.getMessage());
             caughtIt = true;
@@ -775,7 +794,7 @@ public class TestDataBag  {
 
         BagFactory.resetSelf();
     }
-    
+
     @Test
     public void testNonSpillableDataBagEquals1() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -789,7 +808,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testNonSpillableDataBagEquals2() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -804,7 +823,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testDefaultDataBagEquals1() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -820,7 +839,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testDefaultDataBagEquals2() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -837,35 +856,35 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
-    public void testInternalCachedBag() throws Exception {    
+
+    public void testInternalCachedBag() throws Exception {
     	// check adding empty tuple
     	DataBag bg0 = new InternalCachedBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 2);
-    	
+
     	// check equal of bags
     	DataBag bg1 = new InternalCachedBag(1, 0.5f);
     	assertEquals(bg1.size(), 0);
-    	
+
     	String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-    	
+
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertFalse(bg1.isSorted());
     	assertFalse(bg1.isDistinct());
-    	
+
     	tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
     	DataBag bg2 = new InternalCachedBag(1, 0.5f);
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalCachedBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -873,7 +892,7 @@ public class TestDataBag  {
             bg3.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg3);
-        
+
         // check iterator
         Iterator<Tuple> iter = bg3.iterator();
         DataBag bg4 = new InternalCachedBag(1, 0.0f);
@@ -881,7 +900,7 @@ public class TestDataBag  {
         	bg4.add(iter.next());
         }
         assertEquals(bg3, bg4);
-        
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
@@ -894,46 +913,46 @@ public class TestDataBag  {
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
         assertEquals(bg3, bg5);
-        
-        
+
+
         bg4.clear();
-        assertEquals(bg4.size(), 0);        
+        assertEquals(bg4.size(), 0);
     }
-    
-    public void testInternalSortedBag() throws Exception {    
-    	
+
+    public void testInternalSortedBag() throws Exception {
+
     	// check adding empty tuple
     	DataBag bg0 = new InternalSortedBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 2);
-    	
+
     	// check equal of bags
     	DataBag bg1 = new InternalSortedBag();
     	assertEquals(bg1.size(), 0);
-    	
+
     	String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }};
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-    	
+
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertTrue(bg1.isSorted());
     	assertFalse(bg1.isDistinct());
-    	
+
     	tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
     	DataBag bg2 = new InternalSortedBag();
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         Iterator<Tuple> iter = bg1.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -941,17 +960,17 @@ public class TestDataBag  {
             bg3.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg3);
-        
+
         iter = bg3.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
-        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));                
-        
+        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
-        
+
         DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
         bg4.add(iter.next());
         bg4.add(iter.next());
@@ -959,21 +978,21 @@ public class TestDataBag  {
         bg4.add(iter.next());
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
-        assertEquals(bg3, bg4);        
-        
+        assertEquals(bg3, bg4);
+
         // check clear
         bg3.clear();
         assertEquals(bg3.size(), 0);
-        
+
         // test with all data spill out
-        DataBag bg5 = new InternalSortedBag();        
+        DataBag bg5 = new InternalSortedBag();
         for(int j=0; j<3; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg5.add(Util.createTuple(tupleContents[i]));
-        	}     
+        	}
         	bg5.spill();
         }
-        
+
         assertEquals(bg5.size(), 9);
         iter = bg5.iterator();
         for(int i=0; i<3; i++) {
@@ -983,21 +1002,21 @@ public class TestDataBag  {
         	iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         }
         for(int i=0; i<3; i++) {
-        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));   
+        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
         }
-        
+
         // test with most data spill out, with some data in memory
         // and merge of spill files
-        DataBag bg6 = new InternalSortedBag();        
+        DataBag bg6 = new InternalSortedBag();
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg6.add(Util.createTuple(tupleContents[i]));
-        	}        	
+        	}
         	if (j != 103) {
         		bg6.spill();
         	}
         }
-        
+
         assertEquals(bg6.size(), 104*3);
         iter = bg6.iterator();
         for(int i=0; i<104; i++) {
@@ -1007,55 +1026,55 @@ public class TestDataBag  {
         	iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         }
         for(int i=0; i<104; i++) {
-        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));   
+        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
         }
-        
+
         // check two implementation of sorted bag can compare correctly
-        DataBag bg7 = new SortedDataBag(null);        
+        DataBag bg7 = new SortedDataBag(null);
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg7.add(Util.createTuple(tupleContents[i]));
-        	}        	
+        	}
         	if (j != 103) {
         		bg7.spill();
         	}
         }
         assertEquals(bg6, bg7);
     }
-    
-    public void testInternalDistinctBag() throws Exception {    
+
+    public void testInternalDistinctBag() throws Exception {
     	// check adding empty tuple
     	DataBag bg0 = new InternalDistinctBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 1);
-    	
+
     	// check equal of bags
     	DataBag bg1 = new InternalDistinctBag();
     	assertEquals(bg1.size(), 0);
-    	
+
     	String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-    	
+
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertFalse(bg1.isSorted());
     	assertTrue(bg1.isDistinct());
-    	
+
     	tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} };
     	DataBag bg2 = new InternalDistinctBag();
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         Iterator<Tuple> iter = bg1.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalDistinctBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
@@ -1064,13 +1083,13 @@ public class TestDataBag  {
         }
         assertEquals(bg2, bg3);
         assertEquals(bg3.size(), 3);
-              
-        
+
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
-        
+
         DataBag bg4 = new InternalDistinctBag(1, 0.0f);
         bg4.add(iter.next());
         bg4.add(iter.next());
@@ -1078,73 +1097,73 @@ public class TestDataBag  {
         bg4.add(iter.next());
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
-        assertEquals(bg3, bg4);        
-        
+        assertEquals(bg3, bg4);
+
         // check clear
         bg3.clear();
         assertEquals(bg3.size(), 0);
-        
+
         // test with all data spill out
-        DataBag bg5 = new InternalDistinctBag();        
+        DataBag bg5 = new InternalDistinctBag();
         for(int j=0; j<3; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg5.add(Util.createTuple(tupleContents[i]));
-        	}        
+        	}
         	bg5.spill();
         }
-        
+
         assertEquals(bg5.size(), 3);
-    
-        
+
+
         // test with most data spill out, with some data in memory
         // and merge of spill files
-        DataBag bg6 = new InternalDistinctBag();        
+        DataBag bg6 = new InternalDistinctBag();
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg6.add(Util.createTuple(tupleContents[i]));
-        	}        	
+        	}
         	if (j != 103) {
         		bg6.spill();
         	}
         }
-        
-        assertEquals(bg6.size(), 3);       
-        
+
+        assertEquals(bg6.size(), 3);
+
         // check two implementation of sorted bag can compare correctly
-        DataBag bg7 = new DistinctDataBag();        
+        DataBag bg7 = new DistinctDataBag();
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg7.add(Util.createTuple(tupleContents[i]));
-        	}        	
+        	}
         	if (j != 103) {
         		bg7.spill();
         	}
         }
         assertEquals(bg6, bg7);
     }
-    
+
     // See PIG-1231
     @Test
     public void testDataBagIterIdempotent() throws Exception {
         DataBag bg0 = new DefaultDataBag();
         processDataBag(bg0, true);
-        
+
         DataBag bg1 = new DistinctDataBag();
         processDataBag(bg1, true);
-        
+
         DataBag bg2 = new InternalDistinctBag();
         processDataBag(bg2, true);
-        
+
         DataBag bg3 = new InternalSortedBag();
         processDataBag(bg3, true);
-        
+
         DataBag bg4 = new SortedDataBag(null);
         processDataBag(bg4, true);
-        
+
         DataBag bg5 = new InternalCachedBag(0, 0);
         processDataBag(bg5, false);
     }
-    
+
     // See PIG-1285
     @Test
     public void testSerializeSingleTupleBag() throws Exception {
@@ -1159,7 +1178,7 @@ public class TestDataBag  {
         dfBag.readFields(dis);
         assertTrue(dfBag.equals(stBag));
     }
-    
+
     // See PIG-2550
     static class MyCustomTuple extends DefaultTuple {
         private static final long serialVersionUID = 8156382697467819543L;
@@ -1184,7 +1203,23 @@ public class TestDataBag  {
         Tuple t2 = iter.next();
         assertTrue(t2.equals(t));
     }
-    
+
+    // See PIG-4260
+    @Test
+    public void testSpillArrayBackedList() throws Exception {
+        Tuple[] tuples = new Tuple[2];
+        tuples[0] = TupleFactory.getInstance().newTuple(1);
+        tuples[0].set(0, "first");
+        tuples[1] = TupleFactory.getInstance().newTuple(1);
+        tuples[1].set(0, "second");
+        DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples));
+        bag.spill();
+        Iterator<Tuple> iter = bag.iterator();
+        assertEquals(tuples[0], iter.next());
+        assertEquals(tuples[1], iter.next());
+        assertFalse(iter.hasNext());
+    }
+
     void processDataBag(DataBag bg, boolean doSpill) {
         Tuple t = TupleFactory.getInstance().newTuple(new Integer(0));
         bg.add(t);
@@ -1194,7 +1229,7 @@ public class TestDataBag  {
         assertTrue(iter.hasNext());
         iter.next();
         assertFalse(iter.hasNext());
-        assertFalse("hasNext should be idempotent", iter.hasNext());        
+        assertFalse("hasNext should be idempotent", iter.hasNext());
     }
 }