You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/08 17:20:46 UTC
svn commit: r610047 - in /incubator/pig/trunk/src/org/apache/pig/data:
DataBag.java DefaultDataBag.java DistinctDataBag.java SortedDataBag.java
Author: gates
Date: Tue Jan 8 08:20:45 2008
New Revision: 610047
URL: http://svn.apache.org/viewvc?rev=610047&view=rev
Log:
PIG-30 Added hashCode() implementation to DataBag and added additional comments throughout DataBag and extenders implementation.
Modified:
incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=610047&r1=610046&r2=610047&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Tue Jan 8 08:20:45 2008
@@ -32,7 +32,38 @@
import org.apache.pig.impl.mapreduceExec.PigMapReduce;
/**
- * A collection of Tuples
+ * A collection of Tuples. A DataBag may or may not fit into memory.
+ * DataBag extends spillable, which means that it registers with a memory
+ * manager. By default, it attempts to keep all of its contents in memory.
+ * If it is asked by the memory manager to spill to disk (by a call to
+ * spill()), it takes whatever it has in memory, opens a spill file, and
+ * writes the contents out. This may happen multiple times. The bag
+ * tracks all of the files it's spilled to.
+ *
+ * DataBag provides an Iterator interface, that allows callers to read
+ * through the contents. The iterators are aware of the data spilling.
+ * They have to be able to handle reading from files, as well as the fact
+ * that data they were reading from memory may have been spilled to disk
+ * underneath them.
+ *
+ * The DataBag interface assumes that all data is written before any is
+ * read. That is, a DataBag cannot be used as a queue. If data is written
+ * after data is read, the results are undefined. This condition is not
+ * checked on each add or read, for reasons of speed. Caveat emptor.
+ *
+ * Since spills are asynchronous (the memory manager requesting a spill
+ * runs in a separate thread), all operations dealing with the mContents
+ * Collection (which is the collection of tuples contained in the bag) have
+ * to be synchronized. This means that reading from a DataBag is currently
+ * serialized. This is ok for the moment because pig execution is
+ * currently single threaded. A ReadWriteLock was experimented with, but
+ * it was found to be about 10x slower than using the synchronize keyword.
+ * If pig changes its execution model to be multithreaded, we may need to
+ * return to this issue, as synchronizing reads will most likely defeat the
+ * purpose of multi-threading execution.
+ *
+ * DataBag come in several types, default, sorted, and distinct. The type
+ * must be chosen up front, there is no way to convert a bag on the fly.
*/
public abstract class DataBag extends Datum implements Spillable {
// Container that holds the tuples. Actual object instantiated by
@@ -170,6 +201,10 @@
}
}
+ /**
+ * This method is potentially very expensive since it may require a
+ * sort of the bag; don't call it unless you have to.
+ */
public int compareTo(Object other) {
// Do we really need to be able to compare to DataAtom and Tuple?
// When does that happen?
@@ -182,9 +217,31 @@
else return -1;
}
- // Don't sort them, just go tuple by tuple.
- Iterator<Tuple> thisIt = this.iterator();
- Iterator<Tuple> otherIt = bOther.iterator();
+ // Ugh, this is bogus. But I have to know if two bags have the
+ // same tuples, regardless of order. Hopefully most of the
+ // time the size check above will prevent this.
+ // If either bag isn't already sorted, create a sorted bag out
+ // of it so I can guarantee order.
+ DataBag thisClone;
+ DataBag otherClone;
+ if (this instanceof SortedDataBag ||
+ this instanceof DistinctDataBag) {
+ thisClone = this;
+ } else {
+ thisClone = new SortedDataBag(null);
+ Iterator<Tuple> i = iterator();
+ while (i.hasNext()) thisClone.add(i.next());
+ }
+ if (other instanceof SortedDataBag ||
+ this instanceof DistinctDataBag) {
+ otherClone = bOther;
+ } else {
+ otherClone = new SortedDataBag(null);
+ Iterator<Tuple> i = bOther.iterator();
+ while (i.hasNext()) otherClone.add(i.next());
+ }
+ Iterator<Tuple> thisIt = thisClone.iterator();
+ Iterator<Tuple> otherIt = otherClone.iterator();
while (thisIt.hasNext() && otherIt.hasNext()) {
Tuple thisT = thisIt.next();
Tuple otherT = otherIt.next();
@@ -203,6 +260,7 @@
}
}
+ @Override
public boolean equals(Object other) {
return compareTo(other) == 0;
}
@@ -270,6 +328,17 @@
}
sb.append('}');
return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ Iterator<Tuple> i = iterator();
+ while (i.hasNext()) {
+ // Use 37 because we want a prime, and tuple uses 31.
+ hash = 37 * hash + i.next().hashCode();
+ }
+ return hash;
}
/**
Modified: incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=610047&r1=610046&r2=610047&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Tue Jan 8 08:20:45 2008
@@ -31,7 +31,9 @@
/**
- * An unordered collection of Tuples (possibly) with multiples.
+ * An unordered collection of Tuples (possibly) with multiples. The tuples
+ * are stored in an ArrayList, since there is no concern for order or
+ * distinctness.
*/
public class DefaultDataBag extends DataBag {
@@ -69,7 +71,7 @@
while (i.hasNext()) {
i.next().write(out);
spilled++;
- // This will spill every 16383 records.
+ // This will report progress every 16383 records.
if ((spilled & 0x3fff) == 0) reportProgress();
}
out.flush();
Modified: incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=610047&r1=610046&r2=610047&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Tue Jan 8 08:20:45 2008
@@ -222,6 +222,10 @@
// from memory that were already in the queue will be fine,
// as they're guaranteed to be ahead of the point we fast
// foward to.
+ // We're guaranteed that the file we want to read from for
+ // the fast forward is the last element in mSpillFiles,
+ // because we don't support calls to add() after calls to
+ // iterator(), and spill() won't create empty files.
try {
in = new DataInputStream(new BufferedInputStream(
new FileInputStream(mSpillFiles.get(
Modified: incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=610047&r1=610046&r2=610047&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Tue Jan 8 08:20:45 2008
@@ -40,9 +40,13 @@
/**
* An ordered collection of Tuples (possibly) with multiples. Data is
- * stored unsorted as it comes in, and only sorted when it is time to dump
+ * stored unsorted in an ArrayList as it comes in, and only sorted when it
+ * is time to dump
* it to a file or when the first iterator is requested. Experementation
* found this to be the faster than storing it sorted to begin with.
+ *
+ * We allow a user defined comparator, but provide a default comparator in
+ * cases where the user doesn't specify one.
*/
public class SortedDataBag extends DataBag {
private Comparator<Tuple> mComp;
@@ -59,6 +63,11 @@
}
+ /**
+ * @param spec EvalSpec to use to do the sorting. spec.getComparator()
+ * will be called to populate our mComp field. If null,
+ * DefaultComparator will be used.
+ */
public SortedDataBag(EvalSpec spec) {
if (spec == null) {
mComp = new DefaultComparator();
@@ -97,8 +106,14 @@
DataOutputStream out = getSpillFile();
// Have to sort the data before we can dump it. It's bogus
// that we have to do this under the lock, but there's no way
- // around it.
- Collections.sort((ArrayList<Tuple>)mContents, mComp);
+ // around it. If the reads alread started, then we've
+ // already sorted it. No reason to do it again. Don't
+ // set mReadStarted, because we could still be in the add
+ // phase, in which case more (unsorted) will be added
+ // later.
+ if (!mReadStarted) {
+ Collections.sort((ArrayList<Tuple>)mContents, mComp);
+ }
Iterator<Tuple> i = mContents.iterator();
while (i.hasNext()) {
i.next().write(out);
@@ -130,6 +145,12 @@
*/
private class SortedDataBagIterator implements Iterator<Tuple> {
+ /**
+ * A container to hold tuples in a priority queue. Stores the
+ * file number the tuple came from, so that when the tuple is read
+ * out of the queue, we know which file to read its replacement
+ * tuple from.
+ */
private class PQContainer implements Comparable<PQContainer> {
public Tuple tuple;
public int fileNum;
@@ -199,6 +220,10 @@
// from memory that were already in the queue will be fine,
// as they're guaranteed to be ahead of the point we fast
// foward to.
+ // We're guaranteed that the file we want to read from for
+ // the fast forward is the last element in mSpillFiles,
+ // because we don't support calls to add() after calls to
+ // iterator(), and spill() won't create empty files.
try {
in = new DataInputStream(new BufferedInputStream(
new FileInputStream(mSpillFiles.get(