You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/08 17:20:46 UTC

svn commit: r610047 - in /incubator/pig/trunk/src/org/apache/pig/data: DataBag.java DefaultDataBag.java DistinctDataBag.java SortedDataBag.java

Author: gates
Date: Tue Jan  8 08:20:45 2008
New Revision: 610047

URL: http://svn.apache.org/viewvc?rev=610047&view=rev
Log:
PIG-30 Added hashCode() implementation to DataBag and added additional comments throughout DataBag and extenders implementation.


Modified:
    incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
    incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
    incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
    incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java

Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=610047&r1=610046&r2=610047&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Tue Jan  8 08:20:45 2008
@@ -32,7 +32,38 @@
 import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 /**
- * A collection of Tuples
+ * A collection of Tuples.  A DataBag may or may not fit into memory.
+ * DataBag extends spillable, which means that it registers with a memory
+ * manager.  By default, it attempts to keep all of its contents in memory.
+ * If it is asked by the memory manager to spill to disk (by a call to
+ * spill()), it takes whatever it has in memory, opens a spill file, and
+ * writes the contents out.  This may happen multiple times.  The bag
+ * tracks all of the files it's spilled to.
+ * 
+ * DataBag provides an Iterator interface, that allows callers to read
+ * through the contents.  The iterators are aware of the data spilling.
+ * They have to be able to handle reading from files, as well as the fact
+ * that data they were reading from memory may have been spilled to disk
+ * underneath them.
+ *
+ * The DataBag interface assumes that all data is written before any is
+ * read.  That is, a DataBag cannot be used as a queue.  If data is written
+ * after data is read, the results are undefined.  This condition is not
+ * checked on each add or read, for reasons of speed.  Caveat emptor.
+ *
+ * Since spills are asynchronous (the memory manager requesting a spill
+ * runs in a separate thread), all operations dealing with the mContents
+ * Collection (which is the collection of tuples contained in the bag) have
+ * to be synchronized.  This means that reading from a DataBag is currently
+ * serialized.  This is ok for the moment because pig execution is
+ * currently single threaded.  A ReadWriteLock was experimented with, but
+ * it was found to be about 10x slower than using the synchronize keyword.
+ * If pig changes its execution model to be multithreaded, we may need to
+ * return to this issue, as synchronizing reads will most likely defeat the
+ * purpose of multi-threading execution.
+ *
+ * DataBag come in several types, default, sorted, and distinct.  The type
+ * must be chosen up front, there is no way to convert a bag on the fly.
  */
 public abstract class DataBag extends Datum implements Spillable {
     // Container that holds the tuples. Actual object instantiated by
@@ -170,6 +201,10 @@
         }
     }
 
+    /**
+     * This method is potentially very expensive since it may require a
+     * sort of the bag; don't call it unless you have to.
+     */
     public int compareTo(Object other) {
         // Do we really need to be able to compare to DataAtom and Tuple?
         // When does that happen?
@@ -182,9 +217,31 @@
                 else return -1;
             }
 
-            // Don't sort them, just go tuple by tuple.
-            Iterator<Tuple> thisIt = this.iterator();
-            Iterator<Tuple> otherIt = bOther.iterator();
+            // Ugh, this is bogus.  But I have to know if two bags have the
+            // same tuples, regardless of order.  Hopefully most of the
+            // time the size check above will prevent this.
+            // If either bag isn't already sorted, create a sorted bag out
+            // of it so I can guarantee order.
+            DataBag thisClone;
+            DataBag otherClone;
+            if (this instanceof SortedDataBag ||
+                    this instanceof DistinctDataBag) {
+                thisClone = this;
+            } else {
+                thisClone = new SortedDataBag(null);
+                Iterator<Tuple> i = iterator();
+                while (i.hasNext()) thisClone.add(i.next());
+            }
+            if (other instanceof SortedDataBag ||
+                    this instanceof DistinctDataBag) {
+                otherClone = bOther;
+            } else {
+                otherClone = new SortedDataBag(null);
+                Iterator<Tuple> i = bOther.iterator();
+                while (i.hasNext()) otherClone.add(i.next());
+            }
+            Iterator<Tuple> thisIt = thisClone.iterator();
+            Iterator<Tuple> otherIt = otherClone.iterator();
             while (thisIt.hasNext() && otherIt.hasNext()) {
                 Tuple thisT = thisIt.next();
                 Tuple otherT = otherIt.next();
@@ -203,6 +260,7 @@
         }
     }
 
+    @Override
     public boolean equals(Object other) {
         return compareTo(other) == 0;
     }
@@ -270,6 +328,17 @@
         }
         sb.append('}');
         return sb.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        Iterator<Tuple> i = iterator();
+        while (i.hasNext()) {
+            // Use 37 because we want a prime, and tuple uses 31.
+            hash = 37 * hash + i.next().hashCode();
+        }
+        return hash;
     }
 
     /**

Modified: incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=610047&r1=610046&r2=610047&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Tue Jan  8 08:20:45 2008
@@ -31,7 +31,9 @@
 
 
 /**
- * An unordered collection of Tuples (possibly) with multiples.
+ * An unordered collection of Tuples (possibly) with multiples.  The tuples
+ * are stored in an ArrayList, since there is no concern for order or
+ * distinctness.
  */
 public class DefaultDataBag extends DataBag {
 
@@ -69,7 +71,7 @@
                 while (i.hasNext()) {
                     i.next().write(out);
                     spilled++;
-                    // This will spill every 16383 records.
+                    // This will report progress every 16383 records.
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();

Modified: incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=610047&r1=610046&r2=610047&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Tue Jan  8 08:20:45 2008
@@ -222,6 +222,10 @@
                 // from memory that were already in the queue will be fine,
                 // as they're guaranteed to be ahead of the point we fast
                 // foward to.
+                // We're guaranteed that the file we want to read from for
+                // the fast forward is the last element in mSpillFiles,
+                // because we don't support calls to add() after calls to
+                // iterator(), and spill() won't create empty files.
                 try {
                     in = new DataInputStream(new BufferedInputStream(
                         new FileInputStream(mSpillFiles.get(

Modified: incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=610047&r1=610046&r2=610047&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Tue Jan  8 08:20:45 2008
@@ -40,9 +40,13 @@
 
 /**
  * An ordered collection of Tuples (possibly) with multiples.  Data is
- * stored unsorted as it comes in, and only sorted when it is time to dump
+ * stored unsorted in an ArrayList as it comes in, and only sorted when it
+ * is time to dump
  * it to a file or when the first iterator is requested.  Experementation
  * found this to be the faster than storing it sorted to begin with.
+ * 
+ * We allow a user defined comparator, but provide a default comparator in
+ * cases where the user doesn't specify one.
  */
 public class SortedDataBag extends DataBag {
     private Comparator<Tuple> mComp;
@@ -59,6 +63,11 @@
 
     }
 
+    /**
+     * @param spec EvalSpec to use to do the sorting. spec.getComparator()
+     * will be called to populate our mComp field.  If null,
+     * DefaultComparator will be used.
+     */
     public SortedDataBag(EvalSpec spec) {
         if (spec == null) {
             mComp = new DefaultComparator();
@@ -97,8 +106,14 @@
                 DataOutputStream out = getSpillFile();
                 // Have to sort the data before we can dump it.  It's bogus
                 // that we have to do this under the lock, but there's no way
-                // around it.
-                Collections.sort((ArrayList<Tuple>)mContents, mComp);
+                // around it.  If the reads alread started, then we've
+                // already sorted it.  No reason to do it again.  Don't
+                // set mReadStarted, because we could still be in the add
+                // phase, in which case more (unsorted) will be added
+                // later.
+                if (!mReadStarted) {
+                    Collections.sort((ArrayList<Tuple>)mContents, mComp);
+                }
                 Iterator<Tuple> i = mContents.iterator();
                 while (i.hasNext()) {
                     i.next().write(out);
@@ -130,6 +145,12 @@
      */
     private class SortedDataBagIterator implements Iterator<Tuple> {
 
+        /**
+         * A container to hold tuples in a priority queue.  Stores the
+         * file number the tuple came from, so that when the tuple is read
+         * out of the queue, we know which file to read its replacement
+         * tuple from.
+         */
         private class PQContainer implements Comparable<PQContainer> {
             public Tuple tuple;
             public int fileNum;
@@ -199,6 +220,10 @@
                 // from memory that were already in the queue will be fine,
                 // as they're guaranteed to be ahead of the point we fast
                 // foward to.
+                // We're guaranteed that the file we want to read from for
+                // the fast forward is the last element in mSpillFiles,
+                // because we don't support calls to add() after calls to
+                // iterator(), and spill() won't create empty files.
                 try {
                     in = new DataInputStream(new BufferedInputStream(
                         new FileInputStream(mSpillFiles.get(