You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/18 15:39:31 UTC
svn commit: r1779347 - in /pig/branches/branch-0.16: ./
src/org/apache/pig/data/ test/org/apache/pig/test/
Author: rohini
Date: Wed Jan 18 15:39:31 2017
New Revision: 1779347
URL: http://svn.apache.org/viewvc?rev=1779347&view=rev
Log:
PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)
Modified:
pig/branches/branch-0.16/CHANGES.txt
pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java
pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java
pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java
pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java
pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java
pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java
Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Jan 18 15:39:31 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)
+
PIG-4918: Pig on Tez cannot switch pig.temp.dir to another fs (daijy)
PIG-5078: Script fails with error - POStoreTez only accepts MROutput (rohini)
Modified: pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/DefaultAbstractBag.java Wed Jan 18 15:39:31 2017
@@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag
}
@SuppressWarnings("rawtypes")
- protected void warn(String msg, Enum warningEnum, Exception e) {
+ protected void warn(String msg, Enum warningEnum, Throwable e) {
pigLogger = PhysicalOperator.getPigLogger();
if(pigLogger != null) {
pigLogger.warn(this, msg, warningEnum);
Modified: pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/DefaultDataBag.java Wed Jan 18 15:39:31 2017
@@ -22,11 +22,11 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.io.FileNotFoundException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,12 +42,12 @@ import org.apache.pig.PigWarning;
public class DefaultDataBag extends DefaultAbstractBag {
/**
- *
+ *
*/
private static final long serialVersionUID = 2L;
private static final Log log = LogFactory.getLog(DefaultDataBag.class);
-
+
private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance();
public DefaultDataBag() {
@@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa
public boolean isSorted() {
return false;
}
-
+
@Override
public boolean isDistinct() {
return false;
}
-
+
@Override
public Iterator<Tuple> iterator() {
return new DefaultDataBagIterator();
@@ -110,12 +110,15 @@ public class DefaultDataBag extends Defa
if ((spilled & 0x3fff) == 0) reportProgress();
}
out.flush();
- } catch (IOException ioe) {
+ out.close();
+ out = null;
+ mContents.clear();
+ } catch (Throwable e) {
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
return 0;
} finally {
if (out != null) {
@@ -126,7 +129,6 @@ public class DefaultDataBag extends Defa
}
}
}
- mContents.clear();
}
// Increment the spill count
incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -156,7 +158,7 @@ public class DefaultDataBag extends Defa
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() {
// Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple
if (hasCachedTuple)
return (mBuf != null);
@@ -209,7 +211,7 @@ public class DefaultDataBag extends Defa
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should never
// happen.
- String msg = "Unable to find our spill file.";
+ String msg = "Unable to find our spill file.";
log.fatal(msg, fnfe);
throw new RuntimeException(msg, fnfe);
}
@@ -223,7 +225,7 @@ public class DefaultDataBag extends Defa
log.fatal(msg, eof);
throw new RuntimeException(msg, eof);
} catch (IOException ioe) {
- String msg = "Unable to read our spill file.";
+ String msg = "Unable to read our spill file.";
log.fatal(msg, ioe);
throw new RuntimeException(msg, ioe);
}
@@ -259,7 +261,7 @@ public class DefaultDataBag extends Defa
log.warn("Failed to close spill file.", e);
}
} catch (IOException ioe) {
- String msg = "Unable to read our spill file.";
+ String msg = "Unable to read our spill file.";
log.fatal(msg, ioe);
throw new RuntimeException(msg, ioe);
}
Modified: pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/DistinctDataBag.java Wed Jan 18 15:39:31 2017
@@ -67,17 +67,17 @@ public class DistinctDataBag extends Def
public boolean isSorted() {
return false;
}
-
+
@Override
public boolean isDistinct() {
return true;
}
-
-
+
+
@Override
public long size() {
if (mSpillFiles != null && mSpillFiles.size() > 0){
- //We need to racalculate size to guarantee a count of unique
+ //We need to racalculate size to guarantee a count of unique
//entries including those on disk
Iterator<Tuple> iter = iterator();
int newSize = 0;
@@ -85,7 +85,7 @@ public class DistinctDataBag extends Def
newSize++;
iter.next();
}
-
+
synchronized(mContents) {
//we don't want adds to change our numbers
//the lock may need to cover more of the method
@@ -94,8 +94,8 @@ public class DistinctDataBag extends Def
}
return mSize;
}
-
-
+
+
@Override
public Iterator<Tuple> iterator() {
return new DistinctDataBagIterator();
@@ -155,12 +155,15 @@ public class DistinctDataBag extends Def
}
}
out.flush();
- } catch (IOException ioe) {
+ out.close();
+ out = null;
+ mContents.clear();
+ } catch (Throwable e) {
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
return 0;
} finally {
if (out != null) {
@@ -171,7 +174,6 @@ public class DistinctDataBag extends Def
}
}
}
- mContents.clear();
}
// Increment the spill count
incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -208,7 +210,7 @@ public class DistinctDataBag extends Def
@Override
public int hashCode() {
- return tuple.hashCode();
+ return tuple.hashCode();
}
}
@@ -237,7 +239,7 @@ public class DistinctDataBag extends Def
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() {
// See if we can find a tuple. If so, buffer it.
mBuf = next();
return mBuf != null;
@@ -295,7 +297,7 @@ public class DistinctDataBag extends Def
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should never
// happen.
- String msg = "Unable to find our spill file.";
+ String msg = "Unable to find our spill file.";
log.fatal(msg, fnfe);
throw new RuntimeException(msg, fnfe);
}
@@ -346,7 +348,7 @@ public class DistinctDataBag extends Def
Iterator<File> i = mSpillFiles.iterator();
while (i.hasNext()) {
try {
- DataInputStream in =
+ DataInputStream in =
new DataInputStream(new BufferedInputStream(
new FileInputStream(i.next())));
mStreams.add(in);
@@ -502,7 +504,7 @@ public class DistinctDataBag extends Def
addToQueue(null, mStreams.size() - 1);
i.remove();
filesToDelete.add(f);
-
+
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// neer happen.
@@ -545,7 +547,7 @@ public class DistinctDataBag extends Def
log.warn("Failed to delete spill file: " + f.getPath());
}
}
-
+
// clear the list, so that finalize does not delete any files,
// when mSpillFiles is assigned a new value
mSpillFiles.clear();
@@ -560,6 +562,6 @@ public class DistinctDataBag extends Def
}
}
}
-
+
}
Modified: pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/SortedDataBag.java Wed Jan 18 15:39:31 2017
@@ -32,7 +32,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.PriorityQueue;
-
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigCounters;
@@ -44,14 +44,14 @@ import org.apache.pig.PigWarning;
* stored unsorted as it comes in, and only sorted when it is time to dump
* it to a file or when the first iterator is requested. Experementation
* found this to be the faster than storing it sorted to begin with.
- *
+ *
* We allow a user defined comparator, but provide a default comparator in
* cases where the user doesn't specify one.
*/
public class SortedDataBag extends DefaultAbstractBag{
/**
- *
+ *
*/
private static final long serialVersionUID = 2L;
@@ -76,7 +76,7 @@ public class SortedDataBag extends Defau
@Override
public int hashCode() {
- return 42;
+ return 42;
}
}
@@ -95,12 +95,12 @@ public class SortedDataBag extends Defau
public boolean isSorted() {
return true;
}
-
+
@Override
public boolean isDistinct() {
return false;
}
-
+
@Override
public Iterator<Tuple> iterator() {
return new SortedDataBagIterator();
@@ -145,12 +145,15 @@ public class SortedDataBag extends Defau
if ((spilled & 0x3fff) == 0) reportProgress();
}
out.flush();
- } catch (IOException ioe) {
+ out.close();
+ out = null;
+ mContents.clear();
+ } catch (Throwable e) {
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
return 0;
} finally {
if (out != null) {
@@ -161,7 +164,6 @@ public class SortedDataBag extends Defau
}
}
}
- mContents.clear();
}
// Increment the spill count
incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -203,7 +205,7 @@ public class SortedDataBag extends Defau
@Override
public int hashCode() {
- return tuple.hashCode();
+ return tuple.hashCode();
}
}
@@ -228,7 +230,7 @@ public class SortedDataBag extends Defau
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() {
// See if we can find a tuple. If so, buffer it.
mBuf = next();
return mBuf != null;
@@ -341,7 +343,7 @@ public class SortedDataBag extends Defau
Iterator<File> i = mSpillFiles.iterator();
while (i.hasNext()) {
try {
- DataInputStream in =
+ DataInputStream in =
new DataInputStream(new BufferedInputStream(
new FileInputStream(i.next())));
mStreams.add(in);
@@ -351,7 +353,7 @@ public class SortedDataBag extends Defau
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// never happen.
- String msg = "Unable to find our spill file.";
+ String msg = "Unable to find our spill file.";
log.fatal(msg, fnfe);
throw new RuntimeException(msg, fnfe);
}
@@ -411,7 +413,7 @@ public class SortedDataBag extends Defau
in.close();
}catch(IOException e) {
log.warn("Failed to close spill file.", e);
- }
+ }
mStreams.set(fileNum, null);
} catch (IOException ioe) {
String msg = "Unable to find our spill file.";
@@ -518,7 +520,7 @@ public class SortedDataBag extends Defau
log.warn("Failed to delete spill file: " + f.getPath());
}
}
-
+
// clear the list, so that finalize does not delete any files,
// when mSpillFiles is assigned a new value
mSpillFiles.clear();
Modified: pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/data/SortedSpillBag.java Wed Jan 18 15:39:31 2017
@@ -29,7 +29,7 @@ import org.apache.pig.classification.Int
/**
* Common functionality for proactively spilling bags that need to keep the data
- * sorted.
+ * sorted.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -54,9 +54,9 @@ public abstract class SortedSpillBag ext
//count for number of objects that have spilled
if(mSpillFiles == null)
incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
-
+
long spilled = 0;
-
+
DataOutputStream out = null;
try {
out = getSpillFile();
@@ -71,13 +71,13 @@ public abstract class SortedSpillBag ext
//sort the tuples
// as per documentation of collection.sort(), it copies to an array,
// sorts and copies back to collection
- // Avoiding that extra copy back to collection (mContents) by
+ // Avoiding that extra copy back to collection (mContents) by
// copying to an array and using Arrays.sort
Tuple[] array = new Tuple[mContents.size()];
mContents.toArray(array);
if(comp == null)
Arrays.sort(array);
- else
+ else
Arrays.sort(array,comp);
//dump the array
@@ -89,12 +89,15 @@ public abstract class SortedSpillBag ext
}
out.flush();
- } catch (IOException ioe) {
+ out.close();
+ out = null;
+ mContents.clear();
+ } catch (Throwable e) {
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
return 0;
} finally {
if (out != null) {
@@ -105,11 +108,9 @@ public abstract class SortedSpillBag ext
}
}
}
- mContents.clear();
-
incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, spilled);
-
+
return spilled;
}
-
+
}
Modified: pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java?rev=1779347&r1=1779346&r2=1779347&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/test/TestDataBag.java Wed Jan 18 15:39:31 2017
@@ -17,17 +17,36 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
-import java.util.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
-
-
-import org.apache.pig.data.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.DistinctDataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.SingleTupleBag;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.Spillable;
import org.junit.After;
import org.junit.Test;
@@ -36,7 +55,7 @@ import org.junit.Test;
/**
* This class will exercise the basic Pig data model and members. It tests for proper behavior in
* assignment and comparison, as well as function application.
- *
+ *
* @author dnm
*/
public class TestDataBag {
@@ -590,7 +609,7 @@ public class TestDataBag {
}
mgr.forceSpill();
}
-
+
assertEquals("Size of distinct data bag is incorrect", rightAnswer.size(), b.size());
// Read tuples back, hopefully they come out in the same order.
@@ -719,14 +738,14 @@ public class TestDataBag {
@Test
public void testDefaultBagFactory() throws Exception {
BagFactory f = BagFactory.getInstance();
-
+
DataBag bag = f.newDefaultBag();
DataBag sorted = f.newSortedBag(null);
DataBag distinct = f.newDistinctBag();
assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
- assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
+ assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
}
@Test
@@ -756,7 +775,7 @@ public class TestDataBag {
try {
BagFactory f = BagFactory.getInstance();
} catch (RuntimeException re) {
- assertEquals("Expected does not extend BagFactory message",
+ assertEquals("Expected does not extend BagFactory message",
"Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!",
re.getMessage());
caughtIt = true;
@@ -775,7 +794,7 @@ public class TestDataBag {
BagFactory.resetSelf();
}
-
+
@Test
public void testNonSpillableDataBagEquals1() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -789,7 +808,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testNonSpillableDataBagEquals2() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -804,7 +823,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testDefaultDataBagEquals1() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -820,7 +839,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testDefaultDataBagEquals2() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -837,35 +856,35 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
- public void testInternalCachedBag() throws Exception {
+
+ public void testInternalCachedBag() throws Exception {
// check adding empty tuple
DataBag bg0 = new InternalCachedBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 2);
-
+
// check equal of bags
DataBag bg1 = new InternalCachedBag(1, 0.5f);
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertFalse(bg1.isSorted());
assertFalse(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
DataBag bg2 = new InternalCachedBag(1, 0.5f);
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
// check bag with data written to disk
DataBag bg3 = new InternalCachedBag(1, 0.0f);
tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -873,7 +892,7 @@ public class TestDataBag {
bg3.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg3);
-
+
// check iterator
Iterator<Tuple> iter = bg3.iterator();
DataBag bg4 = new InternalCachedBag(1, 0.0f);
@@ -881,7 +900,7 @@ public class TestDataBag {
bg4.add(iter.next());
}
assertEquals(bg3, bg4);
-
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
@@ -894,46 +913,46 @@ public class TestDataBag {
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
assertEquals(bg3, bg5);
-
-
+
+
bg4.clear();
- assertEquals(bg4.size(), 0);
+ assertEquals(bg4.size(), 0);
}
-
- public void testInternalSortedBag() throws Exception {
-
+
+ public void testInternalSortedBag() throws Exception {
+
// check adding empty tuple
DataBag bg0 = new InternalSortedBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 2);
-
+
// check equal of bags
DataBag bg1 = new InternalSortedBag();
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }};
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertTrue(bg1.isSorted());
assertFalse(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
DataBag bg2 = new InternalSortedBag();
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
Iterator<Tuple> iter = bg1.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+
// check bag with data written to disk
DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -941,17 +960,17 @@ public class TestDataBag {
bg3.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg3);
-
+
iter = bg3.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
assertTrue(iter.hasNext());
-
+
DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
bg4.add(iter.next());
bg4.add(iter.next());
@@ -959,21 +978,21 @@ public class TestDataBag {
bg4.add(iter.next());
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
- assertEquals(bg3, bg4);
-
+ assertEquals(bg3, bg4);
+
// check clear
bg3.clear();
assertEquals(bg3.size(), 0);
-
+
// test with all data spill out
- DataBag bg5 = new InternalSortedBag();
+ DataBag bg5 = new InternalSortedBag();
for(int j=0; j<3; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg5.add(Util.createTuple(tupleContents[i]));
- }
+ }
bg5.spill();
}
-
+
assertEquals(bg5.size(), 9);
iter = bg5.iterator();
for(int i=0; i<3; i++) {
@@ -983,21 +1002,21 @@ public class TestDataBag {
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
}
for(int i=0; i<3; i++) {
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
}
-
+
// test with most data spill out, with some data in memory
// and merge of spill files
- DataBag bg6 = new InternalSortedBag();
+ DataBag bg6 = new InternalSortedBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg6.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg6.spill();
}
}
-
+
assertEquals(bg6.size(), 104*3);
iter = bg6.iterator();
for(int i=0; i<104; i++) {
@@ -1007,55 +1026,55 @@ public class TestDataBag {
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
}
for(int i=0; i<104; i++) {
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
}
-
+
// check two implementation of sorted bag can compare correctly
- DataBag bg7 = new SortedDataBag(null);
+ DataBag bg7 = new SortedDataBag(null);
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg7.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg7.spill();
}
}
assertEquals(bg6, bg7);
}
-
- public void testInternalDistinctBag() throws Exception {
+
+ public void testInternalDistinctBag() throws Exception {
// check adding empty tuple
DataBag bg0 = new InternalDistinctBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 1);
-
+
// check equal of bags
DataBag bg1 = new InternalDistinctBag();
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertFalse(bg1.isSorted());
assertTrue(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} };
DataBag bg2 = new InternalDistinctBag();
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
Iterator<Tuple> iter = bg1.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+
// check bag with data written to disk
DataBag bg3 = new InternalDistinctBag(1, 0.0f);
tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
@@ -1064,13 +1083,13 @@ public class TestDataBag {
}
assertEquals(bg2, bg3);
assertEquals(bg3.size(), 3);
-
-
+
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
assertTrue(iter.hasNext());
-
+
DataBag bg4 = new InternalDistinctBag(1, 0.0f);
bg4.add(iter.next());
bg4.add(iter.next());
@@ -1078,73 +1097,73 @@ public class TestDataBag {
bg4.add(iter.next());
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
- assertEquals(bg3, bg4);
-
+ assertEquals(bg3, bg4);
+
// check clear
bg3.clear();
assertEquals(bg3.size(), 0);
-
+
// test with all data spill out
- DataBag bg5 = new InternalDistinctBag();
+ DataBag bg5 = new InternalDistinctBag();
for(int j=0; j<3; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg5.add(Util.createTuple(tupleContents[i]));
- }
+ }
bg5.spill();
}
-
+
assertEquals(bg5.size(), 3);
-
-
+
+
// test with most data spill out, with some data in memory
// and merge of spill files
- DataBag bg6 = new InternalDistinctBag();
+ DataBag bg6 = new InternalDistinctBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg6.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg6.spill();
}
}
-
- assertEquals(bg6.size(), 3);
-
+
+ assertEquals(bg6.size(), 3);
+
// check two implementation of sorted bag can compare correctly
- DataBag bg7 = new DistinctDataBag();
+ DataBag bg7 = new DistinctDataBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg7.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg7.spill();
}
}
assertEquals(bg6, bg7);
}
-
+
// See PIG-1231
@Test
public void testDataBagIterIdempotent() throws Exception {
DataBag bg0 = new DefaultDataBag();
processDataBag(bg0, true);
-
+
DataBag bg1 = new DistinctDataBag();
processDataBag(bg1, true);
-
+
DataBag bg2 = new InternalDistinctBag();
processDataBag(bg2, true);
-
+
DataBag bg3 = new InternalSortedBag();
processDataBag(bg3, true);
-
+
DataBag bg4 = new SortedDataBag(null);
processDataBag(bg4, true);
-
+
DataBag bg5 = new InternalCachedBag(0, 0);
processDataBag(bg5, false);
}
-
+
// See PIG-1285
@Test
public void testSerializeSingleTupleBag() throws Exception {
@@ -1159,7 +1178,7 @@ public class TestDataBag {
dfBag.readFields(dis);
assertTrue(dfBag.equals(stBag));
}
-
+
// See PIG-2550
static class MyCustomTuple extends DefaultTuple {
private static final long serialVersionUID = 8156382697467819543L;
@@ -1184,7 +1203,23 @@ public class TestDataBag {
Tuple t2 = iter.next();
assertTrue(t2.equals(t));
}
-
+
+ // See PIG-4260
+ @Test
+ public void testSpillArrayBackedList() throws Exception {
+ Tuple[] tuples = new Tuple[2];
+ tuples[0] = TupleFactory.getInstance().newTuple(1);
+ tuples[0].set(0, "first");
+ tuples[1] = TupleFactory.getInstance().newTuple(1);
+ tuples[1].set(0, "second");
+ DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples));
+ bag.spill();
+ Iterator<Tuple> iter = bag.iterator();
+ assertEquals(tuples[0], iter.next());
+ assertEquals(tuples[1], iter.next());
+ assertFalse(iter.hasNext());
+ }
+
void processDataBag(DataBag bg, boolean doSpill) {
Tuple t = TupleFactory.getInstance().newTuple(new Integer(0));
bg.add(t);
@@ -1194,7 +1229,7 @@ public class TestDataBag {
assertTrue(iter.hasNext());
iter.next();
assertFalse(iter.hasNext());
- assertFalse("hasNext should be idempotent", iter.hasNext());
+ assertFalse("hasNext should be idempotent", iter.hasNext());
}
}