You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/04 23:58:23 UTC
svn commit: r609048 [1/2] - in /incubator/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/builtin/ src/org/apache/pig/data/
src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/
src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/eval/collec...
Author: gates
Date: Fri Jan 4 14:58:20 2008
New Revision: 609048
URL: http://svn.apache.org/viewvc?rev=609048&view=rev
Log:
PIG-30: Rewrote DataBags to better handle decisions of when to spill to
disk and to spill more intelligently.
Added:
incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java
incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java
Removed:
incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/PigServer.java
incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java
incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java
incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java
incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java
incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java
incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java
incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java
incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java
incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java
incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
incubator/pig/trunk/src/org/apache/pig/data/DataMap.java
incubator/pig/trunk/src/org/apache/pig/data/Datum.java
incubator/pig/trunk/src/org/apache/pig/data/Tuple.java
incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java
incubator/pig/trunk/test/org/apache/pig/test/Util.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Jan 4 14:58:20 2008
@@ -58,3 +58,7 @@
PIG-41: Added patterns to svn:ignore
PIG-51: Fixed combiner in the presence of flattening
+
+ PIG-30: Rewrote DataBags to better handle decisions of when to spill to
+ disk and to spill more intelligently. (gates)
+
Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Fri Jan 4 14:58:20 2008
@@ -57,6 +57,7 @@
import org.apache.pig.impl.physicalLayer.PhysicalPlan;
import org.apache.pig.impl.physicalLayer.POVisitor;
import org.apache.pig.impl.physicalLayer.POPrinter;
+import org.apache.pig.impl.util.PigLogger;
@@ -172,7 +173,7 @@
logMessage += (logMessage + urls.nextElement() + "; ");
}
- pigContext.getLogger().debug(logMessage);
+ PigLogger.getLogger().debug(logMessage);
}
return resourceLocation;
@@ -302,7 +303,7 @@
pp = physicalPlans.get(readFrom);
}
- return pp.exec(continueFromLast).content();
+ return pp.exec(continueFromLast).iterator();
}
@@ -319,7 +320,7 @@
readFrom.compile(queryResults);
readFrom.exec();
if (pigContext.getExecType() == ExecType.LOCAL)
- return readFrom.read().content();
+ return readFrom.read().iterator();
final LoadFunc p;
try{
@@ -534,7 +535,7 @@
stream.println("Logical Plan:");
IntermedResult ir = queryResults.get(alias);
if (ir == null) {
- pigContext.getLogger().error("Invalid alias: " + alias);
+ PigLogger.getLogger().error("Invalid alias: " + alias);
throw new IOException("Invalid alias: " + alias);
}
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java Fri Jan 4 14:58:20 2008
@@ -106,7 +106,7 @@
double sum = 0;
double count = 0;
- for (Iterator it = values.content(); it.hasNext();) {
+ for (Iterator it = values.iterator(); it.hasNext();) {
Tuple t = (Tuple) it.next();
// if(!(t.getField(0) instanceof DataAtom)) {
// throw new RuntimeException("Unexpected Type: " + t.getField(0).getClass().getName() + " in " + t);
@@ -124,14 +124,14 @@
DataBag values = input.getBagField(0);
- return values.cardinality();
+ return values.size();
}
static protected double sum(Tuple input) throws IOException {
DataBag values = input.getBagField(0);
double sum = 0;
- for (Iterator it = values.content(); it.hasNext();) {
+ for (Iterator it = values.iterator(); it.hasNext();) {
Tuple t = (Tuple) it.next();
sum += t.getAtomField(0).numval();
}
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java Fri Jan 4 14:58:20 2008
@@ -77,7 +77,7 @@
static protected long count(Tuple input) throws IOException {
Datum values = input.getField(0);
if (values instanceof DataBag)
- return ((DataBag)values).cardinality();
+ return ((DataBag)values).size();
else if (values instanceof DataMap)
return ((DataMap)values).cardinality();
else
@@ -87,7 +87,7 @@
static protected long sum(Tuple input) throws IOException {
DataBag values = input.getBagField(0);
long sum = 0;
- for (Iterator<Tuple> it = values.content(); it.hasNext();) {
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
try {
sum += t.getAtomField(0).longVal();
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java Fri Jan 4 14:58:20 2008
@@ -48,9 +48,9 @@
if (input.getField(0) instanceof DataBag) {
DataBag field1 = input.getBagField(0);
DataBag field2 = input.getBagField(1);
- Iterator<Tuple> it1 = field1.content();
+ Iterator<Tuple> it1 = field1.iterator();
checkInBag(field2, it1, output);
- Iterator<Tuple> it2 = field2.content();
+ Iterator<Tuple> it2 = field2.iterator();
checkInBag(field1, it2, output);
} else {
DataAtom d1 = input.getAtomField(0);
@@ -65,7 +65,7 @@
private void checkInBag(DataBag bag, Iterator<Tuple> iterator, DataBag emitTo) throws IOException {
while(iterator.hasNext()) {
Tuple t = iterator.next();
- Iterator<Tuple> it2 = bag.content();
+ Iterator<Tuple> it2 = bag.iterator();
boolean found = false;
while(it2.hasNext()) {
if (t.equals(it2.next())) {
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java Fri Jan 4 14:58:20 2008
@@ -32,7 +32,7 @@
public boolean exec(Tuple input) throws IOException {
Datum values = input.getField(0);
if (values instanceof DataBag)
- return ((DataBag)values).cardinality() == 0;
+ return ((DataBag)values).size() == 0;
else if (values instanceof DataMap)
return ((DataMap)values).cardinality() == 0;
else
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java Fri Jan 4 14:58:20 2008
@@ -68,7 +68,7 @@
DataBag values = input.getBagField(0);
double curMax = Double.NEGATIVE_INFINITY;
- for (Iterator it = values.content(); it.hasNext();) {
+ for (Iterator it = values.iterator(); it.hasNext();) {
Tuple t = (Tuple) it.next();
try {
curMax = java.lang.Math.max(curMax, t.getAtomField(0).numval());
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java Fri Jan 4 14:58:20 2008
@@ -68,7 +68,7 @@
DataBag values = input.getBagField(0);
double curMin = Double.POSITIVE_INFINITY;
- for (Iterator it = values.content(); it.hasNext();) {
+ for (Iterator it = values.iterator(); it.hasNext();) {
Tuple t = (Tuple) it.next();
try {
curMin = java.lang.Math.min(curMin, t.getAtomField(0).numval());
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java Fri Jan 4 14:58:20 2008
@@ -70,13 +70,13 @@
double sum = 0;
int i = 0;
Tuple t = null;
- for (Iterator it = values.content(); it.hasNext();) {
+ for (Iterator it = values.iterator(); it.hasNext();) {
try {
t = (Tuple) it.next();
i++;
sum += t.getAtomField(0).numval();
}catch(RuntimeException exp) {
- String msg = "iteration = " + i + "bag size = " + values.cardinality() + " partial sum = " + sum + "\n";
+ String msg = "iteration = " + i + "bag size = " + values.size() + " partial sum = " + sum + "\n";
if (t != null)
msg += "previous tupple = " + t.toString();
throw new RuntimeException(exp.getMessage() + " additional info: " + msg);
Modified: incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java Fri Jan 4 14:58:20 2008
@@ -17,43 +17,58 @@
*/
package org.apache.pig.data;
-import java.io.File;
-import java.io.IOException;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+/**
+ * A bag factory. Can be used to generate different types of bags
+ * depending on what is needed.
+ */
public class BagFactory {
+ private static BagFactory gSelf;
+ private static SpillableMemoryManager gMemMgr;
- private File tmpdir;
- private static BagFactory instance = new BagFactory();
-
- static{
- init(new File(System.getProperty("java.io.tmpdir")));
- }
+ static { gSelf = new BagFactory(); }
+
+ /**
+ * Get a reference to the singleton factory.
+ */
public static BagFactory getInstance() {
- return instance;
- }
-
- private BagFactory() {
- }
-
- public static void init(File tmpdir) {
- instance.setTmpDir(tmpdir);
- }
-
- private void setTmpDir(File tmpdir) {
- this.tmpdir = tmpdir;
- this.tmpdir.mkdirs();
+ return gSelf;
}
- // Get BigBag or Bag, depending on whether the temp directory has been set up
- public DataBag getNewBag() throws IOException {
- if (tmpdir == null) return new DataBag();
- else return getNewBigBag();
+ /**
+ * Get a default (unordered, not distinct) data bag.
+ */
+ public DataBag newDefaultBag() {
+ DataBag b = new DefaultDataBag();
+ gMemMgr.registerSpillable(b);
+ return b;
+ }
+
+ /**
+ * Get a sorted data bag.
+ * @param spec EvalSpec that controls how the data is sorted.
+ * If null, default comparator will be used.
+ */
+ public DataBag newSortedBag(EvalSpec spec) {
+ DataBag b = new SortedDataBag(spec);
+ gMemMgr.registerSpillable(b);
+ return b;
}
- // Need a Big Bag, dammit!
- public BigDataBag getNewBigBag() throws IOException {
- if (tmpdir == null) throw new IOException("No temp directory given for BigDataBag.");
- else return new BigDataBag(tmpdir);
+ /**
+ * Get a distinct data bag.
+ */
+ public DataBag newDistinctBag() {
+ DataBag b = new DistinctDataBag();
+ gMemMgr.registerSpillable(b);
+ return b;
+ }
+
+ private BagFactory() {
+ gMemMgr = new SpillableMemoryManager();
}
}
+
Modified: incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java Fri Jan 4 14:58:20 2008
@@ -153,4 +153,13 @@
return stringVal.hashCode();
}
+ @Override
+ public long getMemorySize() {
+ long used = 0;
+ if (stringVal != null) used += stringVal.length() * 2 + OBJECT_SIZE;
+ if (doubleVal != null) used += 8 + OBJECT_SIZE;
+ if (binaryVal != null) used += binaryVal.length + OBJECT_SIZE;
+ used += OBJECT_SIZE + 3 * REF_SIZE;
+ return used;
+ }
}
Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Fri Jan 4 14:58:20 2008
@@ -17,263 +17,308 @@
*/
package org.apache.pig.data;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
+import java.util.ArrayList;
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.util.Spillable;
import org.apache.pig.impl.mapreduceExec.PigMapReduce;
-
/**
* A collection of Tuples
*/
-public class DataBag extends Datum{
- protected List<Tuple> content;
- protected boolean isSorted = false;
-
- public DataBag() {
- content = new ArrayList<Tuple>();
- }
+public abstract class DataBag extends Datum implements Spillable {
+ // Container that holds the tuples. Actual object instantiated by
+ // subclasses.
+ protected Collection<Tuple> mContents;
- public DataBag(List<Tuple> c) {
- content = c;
- }
+ // Spill files we've created. These need to be removed in finalize.
+ protected ArrayList<File> mSpillFiles;
- public DataBag(Tuple t) {
- content = new ArrayList<Tuple>();
- content.add(t);
- }
+ // Total size, including tuples on disk. Stored here so we don't have
+ // to run through the disk when people ask.
+ protected long mSize = 0;
- public int cardinality() {
- return content.size();
- }
+ protected boolean mMemSizeChanged = false;
- public boolean isEmpty() {
- return content.size() == 0;
- }
-
- public int compareTo(Object other) {
- if (this == other)
- return 0;
- if (other instanceof DataAtom) return +1;
- if (other instanceof Tuple) return -1;
- if (other instanceof DataBag){
- DataBag bOther = (DataBag) other;
- if (this.cardinality() != bOther.cardinality()) {
- return (this.cardinality() - bOther.cardinality());
- }
-
- // same cardinality, so compare tuple by tuple ...
- if (!isSorted())
- this.sort();
- if (!bOther.isSorted())
- bOther.sort();
-
- Iterator<Tuple> thisIt = this.content();
- Iterator<Tuple> otherIt = bOther.content();
- while (thisIt.hasNext() && otherIt.hasNext()) {
- Tuple thisT = thisIt.next();
- Tuple otherT = otherIt.next();
-
- int c = thisT.compareTo(otherT);
- if (c != 0) return c;
- }
-
- return 0; // if we got this far, they must be equal
- }else{
- return -1;
- }
-
- }
-
- @Override
- public boolean equals(Object other) {
- return (compareTo(other) == 0);
+ protected long mMemSize = 0;
+
+ /**
+ * Get the number of elements in the bag, both in memory and on disk.
+ */
+ public long size() {
+ return mSize;
}
-
- public void sort() {
- Collections.sort(content);
+
+ /**
+ * Deprecated. Use size() instead.
+ */
+ public int cardinality() {
+ return (int)size();
}
- public void sort(EvalSpec spec) {
- Collections.sort(content, spec.getComparator());
- isSorted = true;
- }
+ /**
+ * Find out if the bag is sorted.
+ */
+ public abstract boolean isSorted();
- public void arrange(EvalSpec spec) {
- sort(spec);
- isSorted = true;
- }
+ /**
+ * Find out if the bag is distinct.
+ */
+ public abstract boolean isDistinct();
- public void distinct() {
-
- Collections.sort(content);
- isSorted = true;
- int curCall = 0;
-
- Tuple lastTup = null;
- for (Iterator<Tuple> it = content.iterator(); it.hasNext(); ) {
- if (curCall < notifyInterval - 1)
- curCall++;
- else
- {
- if (PigMapReduce.reporter != null)
- PigMapReduce.reporter.progress();
- curCall = 0;
- }
- Tuple thisTup = it.next();
-
- if (lastTup == null) {
- lastTup = thisTup;
- continue;
- }
-
- if (thisTup.compareTo(lastTup) == 0) {
- it.remove();
- } else {
- lastTup = thisTup;
- }
- }
- }
-
- public static int notifyInterval = 1000;
- public int numNotifies; // used for unit tests only
+ /**
+ * Get an iterator to the bag. For default and distinct bags,
+ * no particular order is guaranteed. For sorted bags the order
+ * is guaranteed to be sorted according
+ * to the provided comparator.
+ */
+ public abstract Iterator<Tuple> iterator();
+ /**
+ * Deprected. Use iterator() instead.
+ */
+ @Deprecated
public Iterator<Tuple> content() {
- return new Iterator<Tuple>() {
- Iterator<Tuple> myIt;
- int curCall;
-
- {
- numNotifies = 0;
- myIt = content.iterator();
-
- }
- public final boolean hasNext(){
- return myIt.hasNext();
- }
- public final Tuple next(){
- if (curCall < notifyInterval - 1)
- curCall ++;
- else{
- if (PigMapReduce.reporter != null)
- PigMapReduce.reporter.progress();
- numNotifies ++;
- curCall = 0;
- }
- return myIt.next();
- }
- public final void remove(){
- myIt.remove();
- }
- };
+ return iterator();
}
-
+ /**
+ * Add a tuple to the bag.
+ * @param t tuple to add.
+ */
public void add(Tuple t) {
- if (t!=null)
- content.add(t);
+ synchronized (mContents) {
+ mMemSizeChanged = true;
+ mSize++;
+ mContents.add(t);
+ }
}
+ /** * Add contents of a bag to the bag.
+ * @param b bag to add contents of.
+ */
public void addAll(DataBag b) {
-
- Iterator<Tuple> it = b.content();
- while (it.hasNext()) {
- add(it.next());
+ synchronized (mContents) {
+ mMemSizeChanged = true;
+ mSize += b.size();
+ Iterator<Tuple> i = b.iterator();
+ while (i.hasNext()) mContents.add(i.next());
}
}
- public void remove(Tuple d) {
- content.remove(d);
- }
-
+ // Do I need remove? I couldn't find it used anywhere.
+
/**
- * Returns the value of field i. Since there may be more than one tuple in the bag, this
- * function throws an exception if it is not the case that all tuples agree on this field
+ * Return the size of memory usage.
*/
- public DataAtom getField(int i) throws IOException {
- DataAtom val = null;
+ @Override
+ public long getMemorySize() {
+ if (!mMemSizeChanged) return mMemSize;
+
+ long used = 0;
+ // I can't afford to talk through all the tuples every time the
+ // memory manager wants to know if it's time to dump. Just sample
+ // the first 100 and see what we get. This may not be 100%
+ // accurate, but it's just an estimate anyway.
+ int j;
+ int numInMem = 0;
+ synchronized (mContents) {
+ numInMem = mContents.size();
+ // Measure only what's in memory, not what's on disk.
+ Iterator<Tuple> i = mContents.iterator();
+ for (j = 0; i.hasNext() && j < 100; j++) {
+ used += i.next().getMemorySize();
+ used += REF_SIZE;
+ }
+ }
+
+ if (numInMem > 100) {
+ // Estimate the per tuple size. Do it in integer arithmetic
+ // (even though it will be slightly less accurate) for speed.
+ used /= j;
+ used *= numInMem;
+ }
- for (Iterator<Tuple> it = content(); it.hasNext();) {
- DataAtom currentVal = it.next().getAtomField(i);
+ mMemSize = used;
+ mMemSizeChanged = false;
+ return used;
+ }
- if (val == null) {
- val = currentVal;
- } else {
- if (!val.strval().equals(currentVal.strval()))
- throw new IOException("Cannot call getField on a databag unless all tuples agree.");
+ /**
+ * Clear out the contents of the bag, both on disk and in memory.
+ * Any attempts to read after this is called will produce undefined
+ * results.
+ */
+ public void clear() {
+ synchronized (mContents) {
+ mContents.clear();
+ if (mSpillFiles != null) {
+ for (int i = 0; i < mSpillFiles.size(); i++) {
+ mSpillFiles.get(i).delete();
+ }
+ mSpillFiles.clear();
}
+ mSize = 0;
}
+ }
- if (val == null)
- throw new IOException("Cannot call getField on an empty databag.");
+ public int compareTo(Object other) {
+ // Do we really need to be able to compare to DataAtom and Tuple?
+ // When does that happen?
+ if (this == other)
+ return 0;
+ if (other instanceof DataBag){
+ DataBag bOther = (DataBag) other;
+ if (this.size() != bOther.size()) {
+ if (this.size() > bOther.size()) return 1;
+ else return -1;
+ }
- return val;
+ // Don't sort them, just go tuple by tuple.
+ Iterator<Tuple> thisIt = this.iterator();
+ Iterator<Tuple> otherIt = bOther.iterator();
+ while (thisIt.hasNext() && otherIt.hasNext()) {
+ Tuple thisT = thisIt.next();
+ Tuple otherT = otherIt.next();
+
+ int c = thisT.compareTo(otherT);
+ if (c != 0) return c;
+ }
+
+ return 0; // if we got this far, they must be equal
+ } else if (other instanceof DataAtom) {
+ return +1;
+ } else if (other instanceof Tuple) {
+ return -1;
+ } else {
+ return -1;
+ }
}
- public void clear(){
- content.clear();
- isSorted = false;
+ public boolean equals(Object other) {
+ return compareTo(other) == 0;
}
-
+ /**
+ * Write a bag's contents to disk.
+ * @param out DataOutput to write data to.
+ * @throws IOException (passes it on from underlying calls).
+ */
@Override
- public void write(DataOutput out) throws IOException {
- out.write(BAG);
- Tuple.encodeInt(out, cardinality());
- Iterator<Tuple> it = content();
- while (it.hasNext()) {
- Tuple item = it.next();
- item.write(out);
- }
+ public void write(DataOutput out) throws IOException {
+ // We don't care whether this bag was sorted or distinct because
+ // using the iterator to write it will guarantee those things come
+ // correctly. And on the other end there'll be no reason to waste
+ // time re-sorting or re-applying distinct.
+ out.write(BAG);
+ out.writeLong(size());
+ Iterator<Tuple> it = iterator();
+ while (it.hasNext()) {
+ Tuple item = it.next();
+ item.write(out);
+ }
}
-
- public static abstract class BagDelimiterTuple extends Tuple{}
- public static class StartBag extends BagDelimiterTuple{}
-
- public static class EndBag extends BagDelimiterTuple{}
-
- public static final Tuple startBag = new StartBag();
- public static final Tuple endBag = new EndBag();
-
+
+ /**
+ * Read a bag from disk.
+ * @param in DataInput to read data from.
+ * @throws IOException (passes it on from underlying calls).
+ */
static DataBag read(DataInput in) throws IOException {
- int size = Tuple.decodeInt(in);
- DataBag ret = BagFactory.getInstance().getNewBag();
+ long size = in.readLong();
+ // Always use a default data bag, as if it was sorted or distinct
+ // we're guaranteed it was written out that way already, and we
+ // don't need to mess with it.
+ DataBag ret = BagFactory.getInstance().newDefaultBag();
- for (int i = 0; i < size; i++) {
+ for (long i = 0; i < size; i++) {
Tuple t = new Tuple();
t.readFields(in);
ret.add(t);
}
return ret;
}
-
- public void markStale(boolean stale){}
-
+
+ /**
+ * This is used by FuncEvalSpec.FakeDataBag.
+ * @param stale Set stale state.
+ */
+ public void markStale(boolean stale)
+ {
+ }
+
+ /**
+ * Write the bag into a string. */
@Override
- public String toString() {
+ public String toString() {
StringBuffer sb = new StringBuffer();
sb.append('{');
- Iterator<Tuple> it = content();
+ Iterator<Tuple> it = iterator();
while ( it.hasNext() ) {
- Tuple t = it.next();
- String s = t.toString();
- sb.append(s);
- if (it.hasNext())
- sb.append(", ");
+ Tuple t = it.next();
+ String s = t.toString();
+ sb.append(s);
+ if (it.hasNext()) sb.append(", ");
}
sb.append('}');
return sb.toString();
}
-
- public boolean isSorted(){
- return isSorted;
+
+ /**
+ * Need to override finalize to clean out the mSpillFiles array.
+ */
+ @Override
+ protected void finalize() {
+ if (mSpillFiles != null) {
+ for (int i = 0; i < mSpillFiles.size(); i++) {
+ mSpillFiles.get(i).delete();
+ }
+ }
}
+
+ /**
+ * Get a file to spill contents to. The file will be registered in the
+ * mSpillFiles array.
+ * @return stream to write tuples to.
+ */
+ protected DataOutputStream getSpillFile() throws IOException {
+ if (mSpillFiles == null) {
+ // We want to keep the list as small as possible.
+ mSpillFiles = new ArrayList<File>(1);
+ }
+
+ File f = File.createTempFile("pigbag", null);
+ f.deleteOnExit();
+ mSpillFiles.add(f);
+ return new DataOutputStream(new BufferedOutputStream(
+ new FileOutputStream(f)));
+ }
+
+ /**
+ * Report progress to HDFS.
+ */
+ protected void reportProgress() {
+ if (PigMapReduce.reporter != null) {
+ PigMapReduce.reporter.progress();
+ }
+ }
+
+ public static abstract class BagDelimiterTuple extends Tuple{}
+ public static class StartBag extends BagDelimiterTuple{}
+
+ public static class EndBag extends BagDelimiterTuple{}
+ public static final Tuple startBag = new StartBag();
+ public static final Tuple endBag = new EndBag();
+
+ protected static final int MAX_SPILL_FILES = 100;
+
}
Modified: incubator/pig/trunk/src/org/apache/pig/data/DataMap.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataMap.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataMap.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataMap.java Fri Jan 4 14:58:20 2008
@@ -24,6 +24,9 @@
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
+import java.util.Iterator;
+import java.lang.String;
+
public class DataMap extends Datum {
@@ -141,5 +144,19 @@
public Map<String, Datum> content(){
return content;
}
+
+ @Override
+ public long getMemorySize() {
+ long used = 0;
+ Iterator<Map.Entry<String, Datum> > i = content.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<String, Datum> e = i.next();
+ used += e.getKey().length() * 2 + OBJECT_SIZE + REF_SIZE;
+ used += e.getValue().getMemorySize() + REF_SIZE;
+ }
+
+ used += 2 * OBJECT_SIZE + REF_SIZE;
+ return used;
+ }
}
Modified: incubator/pig/trunk/src/org/apache/pig/data/Datum.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/Datum.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/Datum.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/Datum.java Fri Jan 4 14:58:20 2008
@@ -32,10 +32,15 @@
public static final byte RECORD_2 = 0x31;
public static final byte RECORD_3 = 0x41;
+ public static final int OBJECT_SIZE = 8;
+ public static final int REF_SIZE = 4;
+
@Override
public abstract boolean equals(Object o);
public abstract void write(DataOutput out) throws IOException;
+
+ public abstract long getMemorySize();
}
Added: incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Fri Jan 4 14:58:20 2008
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+
+import org.apache.pig.impl.util.PigLogger;
+
+
+/**
+ * An unordered collection of Tuples (possibly) with multiples.
+ */
+public class DefaultDataBag extends DataBag {
+
+ public DefaultDataBag() {
+ mContents = new ArrayList<Tuple>();
+ }
+
+ @Override
+ public boolean isSorted() {
+ return false;
+ }
+
+ @Override
+ public boolean isDistinct() {
+ return false;
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new DefaultDataBagIterator();
+ }
+
+ public long spill() {
+ // Make sure we have something to spill. Don't create empty
+ // files, as that will make a mess.
+ if (mContents.size() == 0) return 0;
+
+ // Lock the container before I spill, so that iterators aren't
+ // trying to read while I'm mucking with the container.
+ long spilled = 0;
+ synchronized (mContents) {
+ try {
+ DataOutputStream out = getSpillFile();
+ Iterator<Tuple> i = mContents.iterator();
+ while (i.hasNext()) {
+ i.next().write(out);
+ spilled++;
+ // This will spill every 16383 records.
+ if ((spilled & 0x3fff) == 0) reportProgress();
+ }
+ out.flush();
+ } catch (IOException ioe) {
+ // Remove the last file from the spilled array, since we failed to
+ // write to it.
+ mSpillFiles.remove(mSpillFiles.size() - 1);
+ PigLogger.getLogger().error(
+ "Unable to spill contents to disk", ioe);
+ return 0;
+ }
+ mContents.clear();
+ }
+ return spilled;
+ }
+
+ /**
+ * An iterator that handles getting the next tuple from the bag. This
+ * iterator has a couple of issues to deal with. First, data can be
+ * stored in a combination of in memory and on disk. Second, the bag
+ * may be asked to spill while the iterator is reading it. This means
+ * that it will be pointing to someplace in memory and suddenly it
+ * will need to switch to a disk file.
+ */
+ private class DefaultDataBagIterator implements Iterator<Tuple> {
+ // We have to buffer a tuple because there's no easy way for next
+ // to tell whether or not there's another tuple available, other
+ // than to read it.
+ private Tuple mBuf = null;
+ private int mMemoryPtr = 0;
+ private int mFilePtr = 0;
+ private DataInputStream mIn = null;
+ private int mCntr = 0;
+
+ DefaultDataBagIterator() {
+ }
+
+ public boolean hasNext() {
+ // See if we can find a tuple. If so, buffer it.
+ mBuf = next();
+ return mBuf != null;
+ }
+
+ public Tuple next() {
+ // This will report progress every 1024 times through next.
+ // This should be much faster than using mod.
+ if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+ // If there's one in the buffer, use that one.
+ if (mBuf != null) {
+ Tuple t = mBuf;
+ mBuf = null;
+ return t;
+ }
+
+ // See if we've been reading from memory or not.
+ if (mMemoryPtr > 0) {
+ // If there's still data in memory, keep reading from
+ // there.
+ // Lock before we check the size, obtain a reader lock,
+ // from this point forward we can't have them spilling on
+ // us.
+ synchronized (mContents) {
+ if (mContents.size() > 0) {
+ return readFromMemory();
+ }
+ }
+
+ // The container spilled since our last read. Don't
+ // need to the hold the lock now, as it's already
+ // spilled on us.
+
+ // Our file pointer will already point to the new
+ // spill file (because it was either already 0 or had
+ // been incremented past the end of the old
+ // mSpillFiles.size()). We need to open the new file
+ // and then fast forward past all of the tuples we've
+ // already read. Then we need to reset mMemoryPtr so
+ // we know to read from the file next time we come
+ // through.
+ try {
+ mIn = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(mSpillFiles.get(mFilePtr++))));
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should never
+ // happen.
+ PigLogger.getLogger().fatal(
+ "Unable to find our spill file", fnfe);
+ throw new RuntimeException(fnfe);
+ }
+ Tuple t = new Tuple();
+ for (int i = 0; i < mMemoryPtr; i++) {
+ try {
+ t.readFields(mIn);
+ } catch (EOFException eof) {
+ // This should never happen, it means we
+ // didn't dump all of our tuples to disk.
+ PigLogger.getLogger().fatal(
+ "Ran out of tuples too soon.", eof);
+ throw new RuntimeException("Ran out of tuples to read prematurely.");
+ } catch (IOException ioe) {
+ PigLogger.getLogger().fatal(
+ "Unable to read our spill file", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+ mMemoryPtr = 0;
+ return readFromFile();
+ }
+
+ // We haven't read from memory yet, so keep trying to read
+ // from the file
+ return readFromFile();
+ }
+
+ /**
+ * Not implemented.
+ */
+ public void remove() {}
+
+ private Tuple readFromFile() {
+ if (mIn != null) {
+ // We already have a file open
+ Tuple t = new Tuple();
+ try {
+ t.readFields(mIn);
+ return t;
+ } catch (EOFException eof) {
+ // Fall through to the next case where we find the
+ // next file, or go to memory
+ } catch (IOException ioe) {
+ PigLogger.getLogger().fatal(
+ "Unable to read our spill file", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ // Need to open the next file, if there is one. Have to lock
+ // here, because otherwise we could decide there's no more
+ // files and between the time we decide that and start trying
+ // to read from memory the container could spill, and then
+ // we're stuck. If there's another file to read, we can
+ // unlock immediately. If there isn't, we need to hold the
+ // lock and go into readFromMemory().
+ synchronized (mContents) {
+ if (mSpillFiles == null || mFilePtr >= mSpillFiles.size()) {
+ // We've read everything there is to read from the files, go
+ // look in memory.
+ return readFromMemory();
+ }
+ }
+
+ // Open the next file, then call ourselves again as it
+ // will enter the if above.
+ try {
+ mIn = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(mSpillFiles.get(mFilePtr++))));
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should never
+ // happen.
+ PigLogger.getLogger().fatal("Unable to find our spill file",
+ fnfe);
+ throw new RuntimeException(fnfe);
+ }
+ return readFromFile();
+ }
+
+ // This should only be called once we know we haven't spilled. It
+ // assumes that the mContents lock is already held before we enter
+ // this function.
+ private Tuple readFromMemory() {
+ if (mContents.size() == 0) return null;
+
+ if (mMemoryPtr < mContents.size()) {
+ return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+ } else {
+ return null;
+ }
+ }
+ }
+}
+
Added: incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Fri Jan 4 14:58:20 2008
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.TreeSet;
+import java.util.Arrays;
+import java.io.BufferedInputStream;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.PigLogger;
+
+
+
+/**
+ * An unordered collection of Tuples with no multiples. Data is
+ * stored without duplicates as it comes in. When it is time to spill,
+ * that data is sorted and written to disk. It must also be sorted upon
+ * the first read, otherwise if a spill happened after that the iterators
+ * would have no way to find their place in the new file. The data is
+ * stored in a HashSet. When it is time to sort it is placed in an
+ * ArrayList and then sorted. Dispite all these machinations, this was
+ * found to be faster than storing it in a TreeSet.
+ */
+public class DistinctDataBag extends DataBag {
+ public DistinctDataBag() {
+ mContents = new HashSet<Tuple>();
+ }
+
+ @Override
+ public boolean isSorted() {
+ return false;
+ }
+
+ @Override
+ public boolean isDistinct() {
+ return true;
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new DistinctDataBagIterator();
+ }
+
+ @Override
+ public void add(Tuple t) {
+ synchronized (mContents) {
+ if (mContents.add(t)) {
+ mSize++;
+ }
+ }
+ }
+
+ @Override
+ public void addAll(DataBag b) {
+ synchronized (mContents) {
+ mSize += b.size();
+ Iterator<Tuple> i = b.iterator();
+ while (i.hasNext()) {
+ if (mContents.add(i.next())) {
+ mSize++;
+ }
+ }
+ }
+ }
+
+
+ public long spill() {
+ // Make sure we have something to spill. Don't create empty
+ // files, as that will make a mess.
+ if (mContents.size() == 0) return 0;
+
+ // Lock the container before I spill, so that iterators aren't
+ // trying to read while I'm mucking with the container.
+ long spilled = 0;
+ synchronized (mContents) {
+ try {
+ DataOutputStream out = getSpillFile();
+ // If we've already started reading, then it will already be
+ // sorted into an array list. If not, we need to sort it
+ // before writing.
+ if (mContents instanceof ArrayList) {
+ Iterator<Tuple> i = mContents.iterator();
+ while (i.hasNext()) {
+ i.next().write(out);
+ spilled++;
+ // This will spill every 16383 records.
+ if ((spilled & 0x3fff) == 0) reportProgress();
+ }
+ } else {
+ Tuple[] array = new Tuple[mContents.size()];
+ mContents.toArray(array);
+ Arrays.sort(array);
+ for (int i = 0; i < array.length; i++) {
+ array[i].write(out);
+ spilled++;
+ // This will spill every 16383 records.
+ if ((spilled & 0x3fff) == 0) reportProgress();
+ }
+ }
+ out.flush();
+ } catch (IOException ioe) {
+ // Remove the last file from the spilled array, since we failed to
+ // write to it.
+ mSpillFiles.remove(mSpillFiles.size() - 1);
+ PigLogger.getLogger().error(
+ "Unable to spill contents to disk", ioe);
+ return 0;
+ }
+ mContents.clear();
+ }
+ return spilled;
+ }
+
+ /**
+ * An iterator that handles getting the next tuple from the bag. This
+ * iterator has a couple of issues to deal with. First, data can be
+ * stored in a combination of in memory and on disk. Second, the bag
+ * may be asked to spill while the iterator is reading it. This means
+ * that it will be pointing to someplace in memory and suddenly it
+ * will need to switch to a disk file.
+ */
+ private class DistinctDataBagIterator implements Iterator<Tuple> {
+
+ private class TContainer implements Comparable<TContainer> {
+ public Tuple tuple;
+ public int fileNum;
+
+ public int compareTo(TContainer other) {
+ return tuple.compareTo(other.tuple);
+ }
+ }
+
+ // We have to buffer a tuple because there's no easy way for next
+ // to tell whether or not there's another tuple available, other
+ // than to read it.
+ private Tuple mBuf = null;
+ private int mMemoryPtr = 0;
+ private TreeSet<TContainer> mMergeTree = null;
+ private ArrayList<DataInputStream> mStreams = null;
+ private int mCntr = 0;
+
+ DistinctDataBagIterator() {
+ // If this is the first read, we need to sort the data.
+ synchronized (mContents) {
+ if (mContents instanceof HashSet) {
+ preMerge();
+ // We're the first reader, we need to sort the data.
+ // This is in case it gets dumped under us.
+ ArrayList<Tuple> l = new ArrayList<Tuple>(mContents);
+ Collections.sort(l);
+ mContents = l;
+ }
+ }
+ }
+
+ public boolean hasNext() {
+ // See if we can find a tuple. If so, buffer it.
+ mBuf = next();
+ return mBuf != null;
+ }
+
+ public Tuple next() {
+ // This will report progress every 1024 times through next.
+ // This should be much faster than using mod.
+ if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+ // If there's one in the buffer, use that one.
+ if (mBuf != null) {
+ Tuple t = mBuf;
+ mBuf = null;
+ return t;
+ }
+
+ // Check to see if we just need to read from memory.
+ boolean spilled = false;
+ synchronized (mContents) {
+ if (mSpillFiles == null || mSpillFiles.size() == 0) {
+ return readFromMemory();
+ }
+
+ if (mMemoryPtr > 0 && mContents.size() == 0) {
+ spilled = true;
+ }
+ }
+
+ // Check to see if we were reading from memory but we spilled
+ if (spilled) {
+ DataInputStream in;
+ // We need to open the new file
+ // and then fast forward past all of the tuples we've
+ // already read. Then we need to place the first tuple
+ // from that file in the priority queue. Whatever tuples
+ // from memory that were already in the queue will be fine,
+ // as they're guaranteed to be ahead of the point we fast
+ // foward to.
+ try {
+ in = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(mSpillFiles.get(
+ mSpillFiles.size() - 1))));
+ if (mStreams == null) {
+ mMergeTree = new TreeSet<TContainer>();
+ // We didn't have any files before this spill.
+ mStreams = new ArrayList<DataInputStream>(1);
+ }
+ mStreams.add(in);
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should never
+ // happen.
+ PigLogger.getLogger().fatal(
+ "Unable to find our spill file", fnfe);
+ throw new RuntimeException(fnfe);
+ }
+
+ // Fast foward past the tuples we've already put in the
+ // queue.
+ Tuple t = new Tuple();
+ for (int i = 0; i < mMemoryPtr; i++) {
+ try {
+ t.readFields(in);
+ } catch (EOFException eof) {
+ // This should never happen, it means we
+ // didn't dump all of our tuples to disk.
+ throw new RuntimeException("Ran out of tuples to read prematurely.");
+ } catch (IOException ioe) {
+ PigLogger.getLogger().fatal(
+ "Unable to read our spill file", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+ mMemoryPtr = 0;
+ // Add the next tuple from this file to the queue.
+ addToQueue(null, mSpillFiles.size() - 1);
+ // Fall through to read the next entry from the priority
+ // queue.
+ }
+
+ // We have spill files, so we need to read the next tuple from
+ // one of those files or from memory.
+ return readFromTree();
+ }
+
+ /**
+ * Not implemented.
+ */
+ public void remove() {}
+
+ private Tuple readFromTree() {
+ if (mMergeTree == null) {
+ // First read, we need to set up the queue and the array of
+ // file streams
+ mMergeTree = new TreeSet<TContainer>();
+
+ // Add one to the size in case we spill later.
+ mStreams =
+ new ArrayList<DataInputStream>(mSpillFiles.size() + 1);
+
+ Iterator<File> i = mSpillFiles.iterator();
+ while (i.hasNext()) {
+ try {
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(
+ new FileInputStream(i.next())));
+ mStreams.add(in);
+ // Add the first tuple from this file into the
+ // merge queue.
+ addToQueue(null, mStreams.size() - 1);
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should
+ // never happen.
+ PigLogger.getLogger().fatal(
+ "Unable to find out spill file.", fnfe);
+ throw new RuntimeException(fnfe);
+ }
+ }
+
+ // Prime one from memory too
+ if (mContents.size() > 0) {
+ addToQueue(null, -1);
+ }
+ }
+
+ if (mMergeTree.size() == 0) return null;
+
+ // Pop the top one off the queue
+ TContainer c = mMergeTree.first();
+ mMergeTree.remove(c);
+
+ // Add the next tuple from whereever we read from into the
+ // queue. Buffer the tuple we're returning, as we'll be
+ // reusing c.
+ Tuple t = c.tuple;
+ addToQueue(c, c.fileNum);
+
+ return t;
+ }
+
+ private void addToQueue(TContainer c, int fileNum) {
+ if (c == null) {
+ c = new TContainer();
+ }
+ c.fileNum = fileNum;
+
+ if (fileNum == -1) {
+ // Need to read from memory. We may have spilled since
+ // this tuple was put in the queue, and hence memory might
+ // be empty. But I don't care, as then I just won't add
+ // any more from memory.
+ synchronized (mContents) {
+ do {
+ c.tuple = readFromMemory();
+ if (c.tuple != null) {
+ // If we find a unique entry, then add it to the queue.
+ // Otherwise ignore it and keep reading.
+ if (mMergeTree.add(c)) {
+ return;
+ }
+ }
+ } while (c.tuple != null);
+ }
+ return;
+ }
+
+ // Read the next tuple from the indicated file
+ DataInputStream in = mStreams.get(fileNum);
+ if (in != null) {
+ // There's still data in this file
+ c.tuple = new Tuple();
+ do {
+ try {
+ c.tuple.readFields(in);
+ // If we find a unique entry, then add it to the queue.
+ // Otherwise ignore it and keep reading. If we run out
+ // of tuples to read that's fine, we just won't add a
+ // new one from this file.
+ if (mMergeTree.add(c)) {
+ return;
+ }
+ } catch (EOFException eof) {
+ // Out of tuples in this file. Set our slot in the
+ // array to null so we don't keep trying to read from
+ // this file.
+ mStreams.set(fileNum, null);
+ return;
+ } catch (IOException ioe) {
+ PigLogger.getLogger().fatal(
+ "Unable to read our spill file", ioe);
+ throw new RuntimeException(ioe);
+ }
+ } while (true);
+ }
+ }
+
+ // Function assumes that the reader lock is already held before we enter
+ // this function.
+ private Tuple readFromMemory() {
+ if (mContents.size() == 0) return null;
+
+ if (mMemoryPtr < mContents.size()) {
+ return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Pre-merge if there are too many spill files. This avoids the issue
+ * of having too large a fan out in our merge. Experimentation by
+ * the hadoop team has shown that 100 is about the optimal number
+ * of spill files. This function modifies the mSpillFiles array
+ * and assumes the write lock is already held. It will not unlock it.
+ *
+ * Tuples are reconstituted as tuples, evaluated, and rewritten as
+ * tuples. This is expensive, but I don't know how to read tuples
+ * from the file otherwise.
+ *
+ * This function is slightly different than the one in
+ * SortedDataBag, as it uses a TreeSet instead of a PriorityQ.
+ */
+ private void preMerge() {
+ if (mSpillFiles == null ||
+ mSpillFiles.size() <= MAX_SPILL_FILES) {
+ return;
+ }
+
+ // While there are more than max spill files, gather max spill
+ // files together and merge them into one file. Then remove the others
+ // from mSpillFiles. The new spill files are attached at the
+ // end of the list, so I can just keep going until I get a
+ // small enough number without too much concern over uneven
+ // size merges. Convert mSpillFiles to a linked list since
+ // we'll be removing pieces from the middle and we want to do
+ // it efficiently.
+ try {
+ LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+ while (ll.size() > MAX_SPILL_FILES) {
+ ListIterator<File> i = ll.listIterator();
+ mStreams =
+ new ArrayList<DataInputStream>(MAX_SPILL_FILES);
+ mMergeTree = new TreeSet<TContainer>();
+
+ for (int j = 0; j < MAX_SPILL_FILES; j++) {
+ try {
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(
+ new FileInputStream(i.next())));
+ mStreams.add(in);
+ addToQueue(null, mStreams.size() - 1);
+ i.remove();
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should
+ // neer happen.
+ PigLogger.getLogger().fatal(
+ "Unable to find out spill file.", fnfe);
+ throw new RuntimeException(fnfe);
+ }
+ }
+
+ // Get a new spill file. This adds one to the end of
+ // the spill files list. So I need to append it to my
+ // linked list as well so that it's still there when I
+ // move my linked list back to the spill files.
+ try {
+ DataOutputStream out = getSpillFile();
+ ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
+ Tuple t;
+ while ((t = readFromTree()) != null) {
+ t.write(out);
+ }
+ out.flush();
+ } catch (IOException ioe) {
+ PigLogger.getLogger().fatal(
+ "Unable to read our spill file", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ // Now, move our new list back to the spill files array.
+ mSpillFiles = new ArrayList<File>(ll);
+ } finally {
+ // Reset mStreams and mMerge so that they'll be allocated
+ // properly for regular merging.
+ mStreams = null;
+ mMergeTree = null;
+ }
+ }
+ }
+
+}
+
Added: incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Fri Jan 4 14:58:20 2008
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.PriorityQueue;
+import java.util.Iterator;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.PigLogger;
+
+
+
+/**
+ * An ordered collection of Tuples (possibly) with multiples. Data is
+ * stored unsorted as it comes in, and only sorted when it is time to dump
+ * it to a file or when the first iterator is requested. Experementation
+ * found this to be the faster than storing it sorted to begin with.
+ */
+public class SortedDataBag extends DataBag {
+ private Comparator<Tuple> mComp;
+ private boolean mReadStarted = false;
+
+ private class DefaultComparator implements Comparator<Tuple> {
+ public int compare(Tuple t1, Tuple t2) {
+ return t1.compareTo(t2);
+ }
+
+ public boolean equals(Object o) {
+ return false;
+ }
+
+ }
+
+ public SortedDataBag(EvalSpec spec) {
+ if (spec == null) {
+ mComp = new DefaultComparator();
+ } else {
+ mComp = spec.getComparator();
+ }
+
+ mContents = new ArrayList<Tuple>();
+ }
+
+ @Override
+ public boolean isSorted() {
+ return true;
+ }
+
+ @Override
+ public boolean isDistinct() {
+ return false;
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new SortedDataBagIterator();
+ }
+
+ public long spill() {
+ // Make sure we have something to spill. Don't create empty
+ // files, as that will make a mess.
+ if (mContents.size() == 0) return 0;
+
+ // Lock the container before I spill, so that iterators aren't
+ // trying to read while I'm mucking with the container.
+ long spilled = 0;
+ synchronized (mContents) {
+ try {
+ DataOutputStream out = getSpillFile();
+ // Have to sort the data before we can dump it. It's bogus
+ // that we have to do this under the lock, but there's no way
+ // around it.
+ Collections.sort((ArrayList<Tuple>)mContents, mComp);
+ Iterator<Tuple> i = mContents.iterator();
+ while (i.hasNext()) {
+ i.next().write(out);
+ spilled++;
+ // This will spill every 16383 records.
+ if ((spilled & 0x3fff) == 0) reportProgress();
+ }
+ out.flush();
+ } catch (IOException ioe) {
+ // Remove the last file from the spilled array, since we failed to
+ // write to it.
+ mSpillFiles.remove(mSpillFiles.size() - 1);
+ PigLogger.getLogger().error(
+ "Unable to spill contents to disk", ioe);
+ return 0;
+ }
+ mContents.clear();
+ }
+ return spilled;
+ }
+
+ /**
+ * An iterator that handles getting the next tuple from the bag. This
+ * iterator has a couple of issues to deal with. First, data can be
+ * stored in a combination of in memory and on disk. Second, the bag
+ * may be asked to spill while the iterator is reading it. This means
+ * that it will be pointing to someplace in memory and suddenly it
+ * will need to switch to a disk file.
+ */
+ private class SortedDataBagIterator implements Iterator<Tuple> {
+
+ private class PQContainer implements Comparable<PQContainer> {
+ public Tuple tuple;
+ public int fileNum;
+
+ public int compareTo(PQContainer other) {
+ return mComp.compare(tuple, other.tuple);
+ }
+ }
+
+ // We have to buffer a tuple because there's no easy way for next
+ // to tell whether or not there's another tuple available, other
+ // than to read it.
+ private Tuple mBuf = null;
+ private int mMemoryPtr = 0;
+ private PriorityQueue<PQContainer> mMergeQ = null;
+ private ArrayList<DataInputStream> mStreams = null;
+ private int mCntr = 0;
+
+ SortedDataBagIterator() {
+ // If this is the first read, we need to sort the data.
+ synchronized (mContents) {
+ if (!mReadStarted) {
+ preMerge();
+ Collections.sort((ArrayList<Tuple>)mContents, mComp);
+ mReadStarted = true;
+ }
+ }
+ }
+
+ public boolean hasNext() {
+ // See if we can find a tuple. If so, buffer it.
+ mBuf = next();
+ return mBuf != null;
+ }
+
+ public Tuple next() {
+ // This will report progress every 1024 times through next.
+ // This should be much faster than using mod.
+ if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+ // If there's one in the buffer, use that one.
+ if (mBuf != null) {
+ Tuple t = mBuf;
+ mBuf = null;
+ return t;
+ }
+
+ // Check to see if we just need to read from memory.
+ boolean spilled = false;
+ synchronized (mContents) {
+ if (mSpillFiles == null || mSpillFiles.size() == 0) {
+ return readFromMemory();
+ }
+
+ // Check to see if we were reading from memory but we spilled
+ if (mMemoryPtr > 0 && mContents.size() == 0) {
+ spilled = true;
+ }
+ }
+
+ if (spilled) {
+ DataInputStream in;
+ // We need to open the new file
+ // and then fast forward past all of the tuples we've
+ // already read. Then we need to place the first tuple
+ // from that file in the priority queue. Whatever tuples
+ // from memory that were already in the queue will be fine,
+ // as they're guaranteed to be ahead of the point we fast
+ // foward to.
+ try {
+ in = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(mSpillFiles.get(
+ mSpillFiles.size() - 1))));
+ if (mStreams == null) {
+ // We didn't have any files before this spill.
+ mMergeQ = new PriorityQueue<PQContainer>(1);
+ mStreams = new ArrayList<DataInputStream>(1);
+ }
+ mStreams.add(in);
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should never
+ // happen.
+ PigLogger.getLogger().fatal(
+ "Unable to find our spill file", fnfe);
+ throw new RuntimeException(fnfe);
+ }
+
+ // Fast foward past the tuples we've already put in the
+ // queue.
+ Tuple t = new Tuple();
+ for (int i = 0; i < mMemoryPtr; i++) {
+ try {
+ t.readFields(in);
+ } catch (EOFException eof) {
+ // This should never happen, it means we
+ // didn't dump all of our tuples to disk.
+ PigLogger.getLogger().fatal(
+ "Ran out of tuples too soon.", eof);
+ throw new RuntimeException("Ran out of tuples to read prematurely.");
+ } catch (IOException ioe) {
+ PigLogger.getLogger().fatal(
+ "Unable to read our spill file", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+ mMemoryPtr = 0;
+ // Add the next tuple from this file to the queue.
+ addToQueue(null, mSpillFiles.size() - 1);
+ // Fall through to read the next entry from the priority
+ // queue.
+ }
+
+ // We have spill files, so we need to read the next tuple from
+ // one of those files or from memory.
+ return readFromPriorityQ();
+ }
+
+ /**
+ * Not implemented.
+ */
+ public void remove() {}
+
+ private Tuple readFromPriorityQ() {
+ if (mMergeQ == null) {
+ // First read, we need to set up the queue and the array of
+ // file streams
+ // Add one to the size for the list in memory.
+ mMergeQ =
+ new PriorityQueue<PQContainer>(mSpillFiles.size() + 1);
+
+ // Add one to the size in case we spill later.
+ mStreams =
+ new ArrayList<DataInputStream>(mSpillFiles.size() + 1);
+
+ Iterator<File> i = mSpillFiles.iterator();
+ while (i.hasNext()) {
+ try {
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(
+ new FileInputStream(i.next())));
+ mStreams.add(in);
+ // Add the first tuple from this file into the
+ // merge queue.
+ addToQueue(null, mStreams.size() - 1);
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should
+ // never happen.
+ PigLogger.getLogger().fatal(
+ "Unable to find our spill file", fnfe);
+ throw new RuntimeException(fnfe);
+ }
+ }
+
+ // Prime one from memory too
+ if (mContents.size() > 0) {
+ addToQueue(null, -1);
+ }
+ }
+
+ // Pop the top one off the queue
+ PQContainer c = mMergeQ.poll();
+ if (c == null) return null;
+
+ // Add the next tuple from whereever we read from into the
+ // queue. Buffer the tuple we're returning, as we'll be
+ // reusing c.
+ Tuple t = c.tuple;
+ addToQueue(c, c.fileNum);
+
+ return t;
+ }
+
+ private void addToQueue(PQContainer c, int fileNum) {
+ if (c == null) {
+ c = new PQContainer();
+ }
+ c.fileNum = fileNum;
+
+ if (fileNum == -1) {
+ // Need to read from memory. We may have spilled since
+ // this tuple was put in the queue, and hence memory might
+ // be empty. But I don't care, as then I just won't add
+ // any more from memory.
+ synchronized (mContents) {
+ c.tuple = readFromMemory();
+ }
+ if (c.tuple != null) {
+ mMergeQ.add(c);
+ }
+ return;
+ }
+
+ // Read the next tuple from the indicated file
+ DataInputStream in = mStreams.get(fileNum);
+ if (in != null) {
+ // There's still data in this file
+ c.tuple = new Tuple();
+ try {
+ c.tuple.readFields(in);
+ mMergeQ.add(c);
+ } catch (EOFException eof) {
+ // Out of tuples in this file. Set our slot in the
+ // array to null so we don't keep trying to read from
+ // this file.
+ mStreams.set(fileNum, null);
+ } catch (IOException ioe) {
+ PigLogger.getLogger().fatal(
+ "Unable to read our spill file", ioe);
+ throw new RuntimeException(ioe);
+ }
+
+ }
+ }
+
+ // Function assumes that the reader lock is already held before we enter
+ // this function.
+ private Tuple readFromMemory() {
+ if (mContents.size() == 0) return null;
+
+ if (mMemoryPtr < mContents.size()) {
+ return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Pre-merge if there are too many spill files. This avoids the issue
+ * of having too large a fan out in our merge. Experimentation by
+ * the hadoop team has shown that 100 is about the optimal number
+ * of spill files. This function modifies the mSpillFiles array
+ * and assumes the write lock is already held. It will not unlock it.
+ *
+ * Tuples are reconstituted as tuples, evaluated, and rewritten as
+ * tuples. This is expensive, but I need to do this in order to
+ * use the sort spec that was provided to me.
+ */
+ private void preMerge() {
+ if (mSpillFiles == null ||
+ mSpillFiles.size() <= MAX_SPILL_FILES) {
+ return;
+ }
+
+ // While there are more than max spill files, gather max spill
+ // files together and merge them into one file. Then remove the others
+ // from mSpillFiles. The new spill files are attached at the
+ // end of the list, so I can just keep going until I get a
+ // small enough number without too much concern over uneven
+ // size merges. Convert mSpillFiles to a linked list since
+ // we'll be removing pieces from the middle and we want to do
+ // it efficiently.
+ try {
+ LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+ while (ll.size() > MAX_SPILL_FILES) {
+ ListIterator<File> i = ll.listIterator();
+ mStreams =
+ new ArrayList<DataInputStream>(MAX_SPILL_FILES);
+ mMergeQ = new PriorityQueue<PQContainer>(MAX_SPILL_FILES);
+
+ for (int j = 0; j < MAX_SPILL_FILES; j++) {
+ try {
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(
+ new FileInputStream(i.next())));
+ mStreams.add(in);
+ addToQueue(null, mStreams.size() - 1);
+ i.remove();
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should
+ // neer happen.
+ PigLogger.getLogger().fatal(
+ "Unable to find our spill file", fnfe);
+ throw new RuntimeException(fnfe);
+ }
+ }
+
+ // Get a new spill file. This adds one to the end of
+ // the spill files list. So I need to append it to my
+ // linked list as well so that it's still there when I
+ // move my linked list back to the spill files.
+ try {
+ DataOutputStream out = getSpillFile();
+ ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
+ Tuple t;
+ while ((t = readFromPriorityQ()) != null) {
+ t.write(out);
+ }
+ out.flush();
+ } catch (IOException ioe) {
+ PigLogger.getLogger().fatal(
+ "Unable to read our spill file", ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ // Now, move our new list back to the spill files array.
+ mSpillFiles = new ArrayList<File>(ll);
+ } finally {
+ // Reset mStreams and mMerge so that they'll be allocated
+ // properly for regular merging.
+ mStreams = null;
+ mMergeQ = null;
+ }
+ }
+ }
+}
+
Modified: incubator/pig/trunk/src/org/apache/pig/data/Tuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/Tuple.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/Tuple.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/Tuple.java Fri Jan 4 14:58:20 2008
@@ -161,8 +161,8 @@
}
} else if (field instanceof DataBag) {
DataBag b = (DataBag) field;
- if (b.cardinality() == 1) {
- Tuple t = b.content().next();
+ if (b.size() == 1) {
+ Tuple t = b.iterator().next();
if (t.arity() == 1) {
return t.getAtomField(0);
}
@@ -180,8 +180,8 @@
return (Tuple) field;
} else if (field instanceof DataBag) {
DataBag b = (DataBag) field;
- if (b.cardinality() == 1) {
- return b.content().next();
+ if (b.size() == 1) {
+ return b.iterator().next();
}
}
@@ -356,5 +356,18 @@
break;
}
return i;
+ }
+
+ @Override
+ public long getMemorySize() {
+ long used = 0;
+ try {
+ int sz = fields.size();
+ for (int i = 0; i < sz; i++) used += getField(i).getMemorySize();
+ used += 2 * OBJECT_SIZE + REF_SIZE;
+ } catch (IOException ioe) {
+ // Not really much I can do here.
+ }
+ return used;
}
}