You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/10/28 17:26:59 UTC
svn commit: r830664 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/builtin/ src/org/apache/pig/data/
test/org/apache/pig/test/
Author: gates
Date: Wed Oct 28 16:26:58 2009
New Revision: 830664
URL: http://svn.apache.org/viewvc?rev=830664&view=rev
Log:
PIG-1037: Converted sorted and distinct bags to use the new active spilling paradigm.
Added:
hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Oct 28 16:26:58 2009
@@ -26,6 +26,9 @@
IMPROVEMENTS
+PIG-1037: Converted sorted and distinct bags to use the new active spilling
+ paradigm (yinghe via gates)
+
PIG-1051: FINDBUGS: NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE (olgan)
PIG-1050: FINDBUGS: DLS_DEAD_LOCAL_STORE: Dead store to local variable (olgan)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Wed Oct 28 16:26:58 2009
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -32,6 +33,7 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -120,13 +122,25 @@
keyLookup = lrKeyInfo.second;
}
+ private DataBag createDataBag() {
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ }
+
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ return new NonSpillableDataBag();
+ }
+ return new InternalCachedBag();
+ }
+
@Override
public Result getNext(Tuple t) throws ExecException {
int keyField = -1;
//Create numInputs bags
Object[] fields = new Object[mBags.length];
for (int i = 0; i < mBags.length; i++) {
- if (mBags[i]) fields[i] = new NonSpillableDataBag();
+ if (mBags[i]) fields[i] = createDataBag();
}
// For each indexed tup in the inp, split them up and place their
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Wed Oct 28 16:26:58 2009
@@ -24,6 +24,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -31,6 +32,8 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -73,9 +76,21 @@
@Override
public Result getNext(Tuple t) throws ExecException {
- if (!inputsAccumulated) {
- Result in = processInput();
- distinctBag = BagFactory.getInstance().newDistinctBag();
+ if (!inputsAccumulated) {
+ Result in = processInput();
+
+ // by default, we create InternalSortedBag, unless user configures
+ // explicitly to use old bag
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");
+ }
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ distinctBag = BagFactory.getInstance().newDistinctBag();
+ } else {
+ distinctBag = new InternalDistinctBag(3);
+ }
+
while (in.returnStatus != POStatus.STATUS_EOP) {
if (in.returnStatus == POStatus.STATUS_ERR) {
log.error("Error in reading from inputs");
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Wed Oct 28 16:26:58 2009
@@ -27,6 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -38,6 +39,8 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.InternalSortedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
@@ -255,9 +258,21 @@
@Override
public Result getNext(Tuple t) throws ExecException {
Result res = new Result();
+
if (!inputsAccumulated) {
- res = processInput();
- sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
+ res = processInput();
+ // by default, we create InternalSortedBag, unless user configures
+ // explicitly to use old bag
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.sort.type");
+ }
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
+ } else {
+ sortedBag = new InternalSortedBag(3, mComparator);
+ }
+
while (res.returnStatus != POStatus.STATUS_EOP) {
if (res.returnStatus == POStatus.STATUS_ERR) {
log.error("Error in reading from the inputs");
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java Wed Oct 28 16:26:58 2009
@@ -23,8 +23,10 @@
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalDistinctBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.WrappedIOException;
@@ -114,8 +116,23 @@
}
}
+ static private DataBag createDataBag() {
+ // by default, we create InternalSortedBag, unless user configures
+ // explicitly to use old bag
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");
+ }
+
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ return BagFactory.getInstance().newDistinctBag();
+ } else {
+ return new InternalDistinctBag(3);
+ }
+ }
+
static private DataBag getDistinctFromNestedBags(Tuple input, EvalFunc evalFunc) throws IOException {
- DataBag result = bagFactory.newDistinctBag();
+ DataBag result = createDataBag();
long progressCounter = 0;
try {
DataBag bg = (DataBag)input.get(0);
@@ -140,7 +157,7 @@
protected DataBag getDistinct(Tuple input) throws IOException {
try {
DataBag inputBg = (DataBag)input.get(0);
- DataBag result = bagFactory.newDistinctBag();
+ DataBag result = createDataBag();
long progressCounter = 0;
for (Tuple tuple : inputBg) {
result.add(tuple);
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Wed Oct 28 16:26:58 2009
@@ -136,6 +136,16 @@
used *= numInMem;
}
+ // add up the overhead for this object, mContents object, references to tuples,
+ // and other object variables
+ used += 12 + 12 + numInMem*4 + 8 + 4 + 8;
+
+ // add up overhead for mSpillFiles ArrayList, Object[] inside ArrayList,
+ // object variable inside ArrayList and references to spill files
+ if (mSpillFiles != null) {
+ used += 12 + 12 + 4 + mSpillFiles.size()*4;
+ }
+
mMemSize = used;
mMemSizeChanged = false;
return used;
@@ -181,19 +191,20 @@
// of it so I can guarantee order.
DataBag thisClone;
DataBag otherClone;
- if (this instanceof SortedDataBag ||
- this instanceof DistinctDataBag) {
+ BagFactory factory = BagFactory.getInstance();
+
+ if (this.isSorted() || this.isDistinct()) {
thisClone = this;
} else {
- thisClone = new SortedDataBag(null);
+ thisClone = factory.newSortedBag(null);
Iterator<Tuple> i = iterator();
while (i.hasNext()) thisClone.add(i.next());
+
}
- if (other instanceof SortedDataBag ||
- other instanceof DistinctDataBag) {
+ if (((DataBag) other).isSorted() || ((DataBag)other).isDistinct()) {
otherClone = bOther;
} else {
- otherClone = new SortedDataBag(null);
+ otherClone = factory.newSortedBag(null);
Iterator<Tuple> i = bOther.iterator();
while (i.hasNext()) otherClone.add(i.next());
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java Wed Oct 28 16:26:58 2009
@@ -183,7 +183,10 @@
*/
public long getMemorySize() {
Iterator<Object> i = mFields.iterator();
- long sum = 0;
+ // initial memory overhead for Tuple object, ArrayList object
+ // and Object[] inside ArrayList, plus references to each tuple field,
+ // plus other object variables
+ long sum = 12*3 + mFields.size()*4 + 8;
while (i.hasNext()) {
sum += getFieldMemorySize(i.next());
}
@@ -307,12 +310,12 @@
case DataType.TUPLE: {
Tuple t = (Tuple)o;
- return t.getMemorySize() + 12;
+ return t.getMemorySize();
}
case DataType.BAG: {
DataBag b = (DataBag)o;
- return b.getMemorySize() + 12;
+ return b.getMemorySize();
}
case DataType.INTEGER:
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Wed Oct 28 16:26:58 2009
@@ -19,6 +19,7 @@
import java.io.*;
import java.util.*;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -41,7 +42,7 @@
}
public InternalCachedBag(int bagCount) {
- float percent = 0.5F;
+ float percent = 0.1F;
if (PigMapReduce.sJobConf != null) {
String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
@@ -82,12 +83,14 @@
if(mContents.size() < cacheLimit) {
mMemSizeChanged = true;
- mContents.add(t);
+ mContents.add(t);
if(mContents.size() < 100)
{
memUsage += t.getMemorySize();
long avgUsage = memUsage / (long)mContents.size();
- cacheLimit = (int)(maxMemUsage / avgUsage);
+ if (avgUsage > 0) {
+ cacheLimit = (int)(maxMemUsage / avgUsage);
+ }
}
} else {
try {
@@ -107,6 +110,20 @@
mSize++;
}
+ public void addAll(DataBag b) {
+ Iterator<Tuple> iter = b.iterator();
+ while(iter.hasNext()) {
+ add(iter.next());
+ }
+ }
+
+ public void addAll(Collection<Tuple> c) {
+ Iterator<Tuple> iter = c.iterator();
+ while(iter.hasNext()) {
+ add(iter.next());
+ }
+ }
+
private void addDone() {
if(out != null) {
try {
Added: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=830664&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Wed Oct 28 16:26:58 2009
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+
+
+
+/**
+ * An unordered collection of Tuples with no multiples. Data is
+ * stored without duplicates as it comes in. When it is time to spill,
+ * that data is sorted and written to disk. The data is
+ * stored in a HashSet. When it is time to sort it is placed in an
+ * ArrayList and then sorted. Dispite all these machinations, this was
+ * found to be faster than storing it in a TreeSet.
+ *
+ * This bag spills pro-actively when the number of tuples in memory
+ * reaches a limit
+ */
+public class InternalDistinctBag extends DefaultAbstractBag {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2L;
+
+ private static final Log log = LogFactory.getLog(InternalDistinctBag.class);
+
+ private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+
+ private transient boolean mReadStarted = false;
+
+ private transient int cacheLimit;
+ private transient long maxMemUsage;
+ private transient long memUsage;
+
+ public InternalDistinctBag() {
+ this(1, -1.0);
+ }
+
+ public InternalDistinctBag(int bagCount) {
+ this(bagCount, -1.0);
+ }
+
+ public InternalDistinctBag(int bagCount, double percent) {
+ if (percent < 0) {
+ percent = 0.1F;
+ if (PigMapReduce.sJobConf != null) {
+ String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+ if (usage != null) {
+ percent = Float.parseFloat(usage);
+ }
+ }
+ }
+
+ init(bagCount, percent);
+ }
+
+ private void init(int bagCount, double percent) {
+ mContents = new HashSet<Tuple>();
+
+ long max = Runtime.getRuntime().maxMemory();
+ maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
+ cacheLimit = Integer.MAX_VALUE;
+
+ // set limit to 0, if memusage is 0 or really really small.
+ // then all tuples are put into disk
+ if (maxMemUsage < 1) {
+ cacheLimit = 0;
+ }
+ }
+
+ public boolean isSorted() {
+ return false;
+ }
+
+ public boolean isDistinct() {
+ return true;
+ }
+
+
+ public long size() {
+ if (mSpillFiles != null && mSpillFiles.size() > 0){
+ //We need to racalculate size to guarantee a count of unique
+ //entries including those on disk
+ Iterator<Tuple> iter = iterator();
+ int newSize = 0;
+ while (iter.hasNext()) {
+ newSize++;
+ iter.next();
+ }
+
+ mSize = newSize;
+ }
+ return mSize;
+ }
+
+
+ public Iterator<Tuple> iterator() {
+ return new DistinctDataBagIterator();
+ }
+
+ @Override
+ public void add(Tuple t) {
+
+ if(mReadStarted) {
+ throw new IllegalStateException("InternalDistinctBag is closed for adding new tuples");
+ }
+
+ if (mContents.size() > cacheLimit) {
+ spill();
+ }
+
+ if (mContents.add(t)) {
+ mMemSizeChanged = true;
+ mSize ++;
+
+ // check how many tuples memory can hold by getting average
+ // size of first 100 tuples
+ if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())) {
+ memUsage += t.getMemorySize();
+ long avgUsage = memUsage / (long)mContents.size();
+ if (avgUsage >0) {
+ cacheLimit = (int)(maxMemUsage / avgUsage);
+ log.debug("Memory can hold "+ cacheLimit + " records.");
+ }
+ }
+ }
+ }
+
+ public void addAll(DataBag b) {
+ Iterator<Tuple> iter = b.iterator();
+ while(iter.hasNext()) {
+ add(iter.next());
+ }
+ }
+
+ public void addAll(Collection<Tuple> c) {
+ Iterator<Tuple> iter = c.iterator();
+ while(iter.hasNext()) {
+ add(iter.next());
+ }
+ }
+
+ public long spill() {
+ // Make sure we have something to spill. Don't create empty
+ // files, as that will make a mess.
+ if (mContents.size() == 0) return 0;
+
+ // Lock the container before I spill, so that iterators aren't
+ // trying to read while I'm mucking with the container.
+ long spilled = 0;
+
+ DataOutputStream out = null;
+ try {
+ out = getSpillFile();
+ } catch (IOException ioe) {
+ // Do not remove last file from spilled array. It was not
+ // added as File.createTmpFile threw an IOException
+ warn(
+ "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
+ return 0;
+ }
+ try {
+ Tuple[] array = new Tuple[mContents.size()];
+ mContents.toArray(array);
+ Arrays.sort(array);
+ for (int i = 0; i < array.length; i++) {
+ array[i].write(out);
+ spilled++;
+ // This will spill every 16383 records.
+ if ((spilled & 0x3fff) == 0) reportProgress();
+ }
+
+ out.flush();
+ } catch (IOException ioe) {
+ // Remove the last file from the spilled array, since we failed to
+ // write to it.
+ mSpillFiles.remove(mSpillFiles.size() - 1);
+ warn(
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+ return 0;
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+ }
+ }
+ }
+ mContents.clear();
+ mMemSizeChanged = true;
+ memUsage = 0;
+
+ return spilled;
+ }
+
+ /**
+ * An iterator that handles getting the next tuple from the bag.
+ * Data can be stored in a combination of in memory and on disk.
+ */
+ private class DistinctDataBagIterator implements Iterator<Tuple> {
+
+ private class TContainer implements Comparable<TContainer> {
+ public Tuple tuple;
+ public int fileNum;
+
+ @SuppressWarnings("unchecked")
+ public int compareTo(TContainer other) {
+ return tuple.compareTo(other.tuple);
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof TContainer) {
+ return compareTo((TContainer)obj) == 0;
+ }
+
+ return false;
+ }
+
+ public int hashCode() {
+ return tuple.hashCode();
+ }
+ }
+
+ // We have to buffer a tuple because there's no easy way for next
+ // to tell whether or not there's another tuple available, other
+ // than to read it.
+ private Tuple mBuf = null;
+ private int mMemoryPtr = 0;
+ private TreeSet<TContainer> mMergeTree = null;
+ private ArrayList<DataInputStream> mStreams = null;
+ private int mCntr = 0;
+
+ @SuppressWarnings("unchecked")
+ DistinctDataBagIterator() {
+ // If this is the first read, we need to sort the data.
+ if (!mReadStarted) {
+ preMerge();
+ // We're the first reader, we need to sort the data.
+ // This is in case it gets dumped under us.
+ ArrayList<Tuple> l = new ArrayList<Tuple>(mContents);
+ Collections.sort(l);
+ mContents = l;
+ mReadStarted = true;
+ }
+ }
+
+ public boolean hasNext() {
+ // See if we can find a tuple. If so, buffer it.
+ mBuf = next();
+ return mBuf != null;
+ }
+
+ public Tuple next() {
+ // This will report progress every 1024 times through next.
+ // This should be much faster than using mod.
+ if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+ // If there's one in the buffer, use that one.
+ if (mBuf != null) {
+ Tuple t = mBuf;
+ mBuf = null;
+ return t;
+ }
+
+ // Check to see if we just need to read from memory.
+ if (mSpillFiles == null || mSpillFiles.size() == 0) {
+ return readFromMemory();
+ }
+
+ // We have spill files, so we need to read the next tuple from
+ // one of those files or from memory.
+ return readFromTree();
+ }
+
+ /**
+ * Not implemented.
+ */
+ public void remove() {}
+
+ private Tuple readFromTree() {
+ if (mMergeTree == null) {
+ // First read, we need to set up the queue and the array of
+ // file streams
+ mMergeTree = new TreeSet<TContainer>();
+
+ // Add one to the size in case we spill later.
+ mStreams =
+ new ArrayList<DataInputStream>(mSpillFiles.size() + 1);
+
+ Iterator<File> i = mSpillFiles.iterator();
+ while (i.hasNext()) {
+ try {
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(
+ new FileInputStream(i.next())));
+ mStreams.add(in);
+ // Add the first tuple from this file into the
+ // merge queue.
+ addToQueue(null, mStreams.size() - 1);
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should
+ // never happen.
+ String msg = "Unable to find our spill file.";
+ log.fatal(msg, fnfe);
+ throw new RuntimeException(msg, fnfe);
+ }
+ }
+
+ // Prime one from memory too
+ if (mContents.size() > 0) {
+ addToQueue(null, -1);
+ }
+ }
+
+ if (mMergeTree.size() == 0) return null;
+
+ // Pop the top one off the queue
+ TContainer c = mMergeTree.first();
+ mMergeTree.remove(c);
+
+ // Add the next tuple from whereever we read from into the
+ // queue. Buffer the tuple we're returning, as we'll be
+ // reusing c.
+ Tuple t = c.tuple;
+ addToQueue(c, c.fileNum);
+
+ return t;
+ }
+
+ private void addToQueue(TContainer c, int fileNum) {
+ if (c == null) {
+ c = new TContainer();
+ }
+ c.fileNum = fileNum;
+
+ if (fileNum == -1) {
+ // Need to read from memory.
+ do {
+ c.tuple = readFromMemory();
+ if (c.tuple != null) {
+ // If we find a unique entry, then add it to the queue.
+ // Otherwise ignore it and keep reading.
+ if (mMergeTree.add(c)) {
+ return;
+ }
+ }
+ } while (c.tuple != null);
+ return;
+ }
+
+ // Read the next tuple from the indicated file
+ DataInputStream in = mStreams.get(fileNum);
+ if (in != null) {
+ // There's still data in this file
+ c.tuple = gTupleFactory.newTuple();
+ do {
+ try {
+ c.tuple.readFields(in);
+ // If we find a unique entry, then add it to the queue.
+ // Otherwise ignore it and keep reading. If we run out
+ // of tuples to read that's fine, we just won't add a
+ // new one from this file.
+ if (mMergeTree.add(c)) {
+ return;
+ }
+ } catch (EOFException eof) {
+ // Out of tuples in this file. Set our slot in the
+ // array to null so we don't keep trying to read from
+ // this file.
+ mStreams.set(fileNum, null);
+ return;
+ } catch (IOException ioe) {
+ String msg = "Unable to find our spill file.";
+ log.fatal(msg, ioe);
+ throw new RuntimeException(msg, ioe);
+ }
+ } while (true);
+ }
+ }
+
+ // Function assumes that the reader lock is already held before we enter
+ // this function.
+ private Tuple readFromMemory() {
+ if (mContents.size() == 0) return null;
+
+ if (mMemoryPtr < mContents.size()) {
+ return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Pre-merge if there are too many spill files. This avoids the issue
+ * of having too large a fan out in our merge. Experimentation by
+ * the hadoop team has shown that 100 is about the optimal number
+ * of spill files. This function modifies the mSpillFiles array
+ * and assumes the write lock is already held. It will not unlock it.
+ *
+ * Tuples are reconstituted as tuples, evaluated, and rewritten as
+ * tuples. This is expensive, but I don't know how to read tuples
+ * from the file otherwise.
+ *
+ * This function is slightly different than the one in
+ * SortedDataBag, as it uses a TreeSet instead of a PriorityQ.
+ */
+ private void preMerge() {
+ if (mSpillFiles == null ||
+ mSpillFiles.size() <= MAX_SPILL_FILES) {
+ return;
+ }
+
+ // While there are more than max spill files, gather max spill
+ // files together and merge them into one file. Then remove the others
+ // from mSpillFiles. The new spill files are attached at the
+ // end of the list, so I can just keep going until I get a
+ // small enough number without too much concern over uneven
+ // size merges. Convert mSpillFiles to a linked list since
+ // we'll be removing pieces from the middle and we want to do
+ // it efficiently.
+ try {
+ LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+ while (ll.size() > MAX_SPILL_FILES) {
+ ListIterator<File> i = ll.listIterator();
+ mStreams =
+ new ArrayList<DataInputStream>(MAX_SPILL_FILES);
+ mMergeTree = new TreeSet<TContainer>();
+
+ for (int j = 0; j < MAX_SPILL_FILES; j++) {
+ try {
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(
+ new FileInputStream(i.next())));
+ mStreams.add(in);
+ addToQueue(null, mStreams.size() - 1);
+ i.remove();
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should
+ // neer happen.
+ String msg = "Unable to find our spill file.";
+ log.fatal(msg, fnfe);
+ throw new RuntimeException(msg, fnfe);
+ }
+ }
+
+ // Get a new spill file. This adds one to the end of
+ // the spill files list. So I need to append it to my
+ // linked list as well so that it's still there when I
+ // move my linked list back to the spill files.
+ try {
+ DataOutputStream out = getSpillFile();
+ ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
+ Tuple t;
+ while ((t = readFromTree()) != null) {
+ t.write(out);
+ }
+ out.flush();
+ } catch (IOException ioe) {
+ String msg = "Unable to find our spill file.";
+ log.fatal(msg, ioe);
+ throw new RuntimeException(msg, ioe);
+ }
+ }
+
+ // Now, move our new list back to the spill files array.
+ mSpillFiles = new ArrayList<File>(ll);
+ } finally {
+ // Reset mStreams and mMerge so that they'll be allocated
+ // properly for regular merging.
+ mStreams = null;
+ mMergeTree = null;
+ }
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=830664&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Wed Oct 28 16:26:58 2009
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.BufferedInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+
+
+/**
+ * An ordered collection of Tuples (possibly) with multiples. Data is
+ * stored unsorted as it comes in, and only sorted when it is time to dump
+ * it to a file or when the first iterator is requested. Experementation
+ * found this to be the faster than storing it sorted to begin with.
+ *
+ * We allow a user defined comparator, but provide a default comparator in
+ * cases where the user doesn't specify one.
+ *
+ * This bag is not registered with SpillableMemoryManager. It calculates
+ * the number of tuples to hold in memory and spill pro-actively into files.
+ */
+public class InternalSortedBag extends DefaultAbstractBag{
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2L;
+
+ private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+
+ private static final Log log = LogFactory.getLog(InternalSortedBag.class);
+
+ private transient Comparator<Tuple> mComp;
+ private transient boolean mReadStarted = false;
+
+ private transient int cacheLimit;
+ private transient long maxMemUsage;
+ private transient long memUsage;
+
+ static private class DefaultComparator implements Comparator<Tuple> {
+ @SuppressWarnings("unchecked")
+ public int compare(Tuple t1, Tuple t2) {
+ return t1.compareTo(t2);
+ }
+
+ public boolean equals(Object o) {
+ return (o == this);
+ }
+
+ }
+
+ public InternalSortedBag() {
+ this(null);
+ }
+
+ public InternalSortedBag(Comparator<Tuple> comp) {
+ this(1, comp);
+ }
+
+ public InternalSortedBag(int bagCount, Comparator<Tuple> comp) {
+ this(1, -1.0, comp);
+ }
+
+ public InternalSortedBag(int bagCount, double percent, Comparator<Tuple> comp) {
+ if (percent < 0) {
+ percent = 0.1F;
+ if (PigMapReduce.sJobConf != null) {
+ String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+ if (usage != null) {
+ percent = Float.parseFloat(usage);
+ }
+ }
+ }
+
+ init(bagCount, percent, comp);
+ }
+
+ /**
+ * @param comp Comparator to use to do the sorting. If null,
+ * DefaultComparator will be used.
+ */
+ private void init(int bagCount, double percent, Comparator<Tuple> comp) {
+ mComp = (comp == null) ? new DefaultComparator() : comp;
+
+
+ mContents = new ArrayList<Tuple>();
+
+ long max = Runtime.getRuntime().maxMemory();
+ maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
+ cacheLimit = Integer.MAX_VALUE;
+
+ // set limit to 0, if memusage is 0 or really really small.
+ // then all tuples are put into disk
+ if (maxMemUsage < 1) {
+ cacheLimit = 0;
+ }
+ }
+
+ public void add(Tuple t) {
+ if(mReadStarted) {
+ throw new IllegalStateException("InternalSortedBag is closed for adding new tuples");
+ }
+
+ if (mContents.size() > cacheLimit) {
+ spill();
+ }
+
+ mMemSizeChanged = true;
+ mContents.add(t);
+
+ // check how many tuples memory can hold by getting average
+ // size of first 100 tuples
+ if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty()))
+ {
+ memUsage += t.getMemorySize();
+ long avgUsage = memUsage / (long)mContents.size();
+ if (avgUsage >0) {
+ cacheLimit = (int)(maxMemUsage / avgUsage);
+ }
+ }
+
+ mSize++;
+ }
+
+ public void addAll(DataBag b) {
+ Iterator<Tuple> iter = b.iterator();
+ while(iter.hasNext()) {
+ add(iter.next());
+ }
+ }
+
+ public void addAll(Collection<Tuple> c) {
+ Iterator<Tuple> iter = c.iterator();
+ while(iter.hasNext()) {
+ add(iter.next());
+ }
+ }
+
+ public boolean isSorted() {
+ return true;
+ }
+
+ public boolean isDistinct() {
+ return false;
+ }
+
+ public Iterator<Tuple> iterator() {
+ return new SortedDataBagIterator();
+ }
+
+ public long spill() {
+ // Make sure we have something to spill. Don't create empty
+ // files, as that will make a mess.
+ if (mContents.size() == 0) return 0;
+
+ // Lock the container before I spill, so that iterators aren't
+ // trying to read while I'm mucking with the container.
+ long spilled = 0;
+
+ DataOutputStream out = null;
+ try {
+ out = getSpillFile();
+ } catch (IOException ioe) {
+ // Do not remove last file from spilled array. It was not
+ // added as File.createTmpFile threw an IOException
+ warn(
+ "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
+ return 0;
+ }
+ try {
+
+ Collections.sort((ArrayList<Tuple>)mContents, mComp);
+
+ Iterator<Tuple> i = mContents.iterator();
+ while (i.hasNext()) {
+ i.next().write(out);
+ spilled++;
+ // This will spill every 16383 records.
+ if ((spilled & 0x3fff) == 0) reportProgress();
+ }
+ out.flush();
+ } catch (IOException ioe) {
+ // Remove the last file from the spilled array, since we failed to
+ // write to it.
+ mSpillFiles.remove(mSpillFiles.size() - 1);
+ warn(
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+ return 0;
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+ }
+ }
+ }
+ mContents.clear();
+ mMemSizeChanged = true;
+ memUsage = 0;
+
+ return spilled;
+ }
+
+ /**
+ * An iterator that handles getting the next tuple from the bag.
+ * Data can be stored in a combination of in memory and on disk.
+ */
+ private class SortedDataBagIterator implements Iterator<Tuple> {
+
+ /**
+ * A container to hold tuples in a priority queue. Stores the
+ * file number the tuple came from, so that when the tuple is read
+ * out of the queue, we know which file to read its replacement
+ * tuple from.
+ */
+ private class PQContainer implements Comparable<PQContainer> {
+ public Tuple tuple;
+ public int fileNum;
+
+ public int compareTo(PQContainer other) {
+ return mComp.compare(tuple, other.tuple);
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof PQContainer) {
+ return compareTo((PQContainer)obj) == 0;
+ }
+
+ return false;
+ }
+
+ public int hashCode() {
+ return tuple.hashCode();
+ }
+ }
+
+ // We have to buffer a tuple because there's no easy way for next
+ // to tell whether or not there's another tuple available, other
+ // than to read it.
+ private Tuple mBuf = null;
+ private int mMemoryPtr = 0;
+ private PriorityQueue<PQContainer> mMergeQ = null;
+ private ArrayList<DataInputStream> mStreams = null;
+ private int mCntr = 0;
+
+ SortedDataBagIterator() {
+ // If this is the first read, we need to sort the data.
+ if (!mReadStarted) {
+ preMerge();
+ Collections.sort((ArrayList<Tuple>)mContents, mComp);
+ mReadStarted = true;
+ }
+ }
+
+ public boolean hasNext() {
+ // See if we can find a tuple. If so, buffer it.
+ mBuf = next();
+ return mBuf != null;
+ }
+
+ public Tuple next() {
+ // This will report progress every 1024 times through next.
+ // This should be much faster than using mod.
+ if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+ // If there's one in the buffer, use that one.
+ if (mBuf != null) {
+ Tuple t = mBuf;
+ mBuf = null;
+ return t;
+ }
+
+ if (mSpillFiles == null || mSpillFiles.size() == 0) {
+ return readFromMemory();
+ }
+
+ // We have spill files, so we need to read the next tuple from
+ // one of those files or from memory.
+ return readFromPriorityQ();
+ }
+
+ /**
+ * Not implemented.
+ */
+ public void remove() {}
+
+ private Tuple readFromPriorityQ() {
+ if (mMergeQ == null) {
+ // First read, we need to set up the queue and the array of
+ // file streams
+ // Add one to the size for the list in memory.
+ mMergeQ =
+ new PriorityQueue<PQContainer>(mSpillFiles.size() + 1);
+
+ // Add one to the size in case we spill later.
+ mStreams =
+ new ArrayList<DataInputStream>(mSpillFiles.size() + 1);
+
+ Iterator<File> i = mSpillFiles.iterator();
+ while (i.hasNext()) {
+ try {
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(
+ new FileInputStream(i.next())));
+ mStreams.add(in);
+ // Add the first tuple from this file into the
+ // merge queue.
+ addToQueue(null, mStreams.size() - 1);
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should
+ // never happen.
+ String msg = "Unable to find our spill file.";
+ log.fatal(msg, fnfe);
+ throw new RuntimeException(msg, fnfe);
+ }
+ }
+
+ // Prime one from memory too
+ if (mContents.size() > 0) {
+ addToQueue(null, -1);
+ }
+ }
+
+ // Pop the top one off the queue
+ PQContainer c = mMergeQ.poll();
+ if (c == null) return null;
+
+ // Add the next tuple from whereever we read from into the
+ // queue. Buffer the tuple we're returning, as we'll be
+ // reusing c.
+ Tuple t = c.tuple;
+ addToQueue(c, c.fileNum);
+
+ return t;
+ }
+
+ private void addToQueue(PQContainer c, int fileNum) {
+ if (c == null) {
+ c = new PQContainer();
+ }
+ c.fileNum = fileNum;
+
+ if (fileNum == -1) {
+ // Need to read from memory.
+ c.tuple = readFromMemory();
+ if (c.tuple != null) {
+ mMergeQ.add(c);
+ }
+ return;
+ }
+
+ // Read the next tuple from the indicated file
+ DataInputStream in = mStreams.get(fileNum);
+ if (in != null) {
+ // There's still data in this file
+ c.tuple = gTupleFactory.newTuple();
+ try {
+ c.tuple.readFields(in);
+ mMergeQ.add(c);
+ } catch (EOFException eof) {
+ // Out of tuples in this file. Set our slot in the
+ // array to null so we don't keep trying to read from
+ // this file.
+ mStreams.set(fileNum, null);
+ } catch (IOException ioe) {
+ String msg = "Unable to find our spill file.";
+ log.fatal(msg, ioe);
+ throw new RuntimeException(msg, ioe);
+ }
+
+ }
+ }
+
+ // Function assumes that the reader lock is already held before we enter
+ // this function.
+ private Tuple readFromMemory() {
+ if (mContents.size() == 0) return null;
+
+ if (mMemoryPtr < mContents.size()) {
+ return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Pre-merge if there are too many spill files. This avoids the issue
+ * of having too large a fan out in our merge. Experimentation by
+ * the hadoop team has shown that 100 is about the optimal number
+ * of spill files. This function modifies the mSpillFiles array
+ * and assumes the write lock is already held. It will not unlock it.
+ *
+ * Tuples are reconstituted as tuples, evaluated, and rewritten as
+ * tuples. This is expensive, but I need to do this in order to
+ * use the sort spec that was provided to me.
+ */
+ private void preMerge() {
+ if (mSpillFiles == null ||
+ mSpillFiles.size() <= MAX_SPILL_FILES) {
+ return;
+ }
+
+ // While there are more than max spill files, gather max spill
+ // files together and merge them into one file. Then remove the others
+ // from mSpillFiles. The new spill files are attached at the
+ // end of the list, so I can just keep going until I get a
+ // small enough number without too much concern over uneven
+ // size merges. Convert mSpillFiles to a linked list since
+ // we'll be removing pieces from the middle and we want to do
+ // it efficiently.
+ try {
+ LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+ while (ll.size() > MAX_SPILL_FILES) {
+ ListIterator<File> i = ll.listIterator();
+ mStreams =
+ new ArrayList<DataInputStream>(MAX_SPILL_FILES);
+ mMergeQ = new PriorityQueue<PQContainer>(MAX_SPILL_FILES);
+
+ for (int j = 0; j < MAX_SPILL_FILES; j++) {
+ try {
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(
+ new FileInputStream(i.next())));
+ mStreams.add(in);
+ addToQueue(null, mStreams.size() - 1);
+ i.remove();
+ } catch (FileNotFoundException fnfe) {
+ // We can't find our own spill file? That should
+ // neer happen.
+ String msg = "Unable to find our spill file.";
+ log.fatal(msg, fnfe);
+ throw new RuntimeException(msg, fnfe);
+ }
+ }
+
+ // Get a new spill file. This adds one to the end of
+ // the spill files list. So I need to append it to my
+ // linked list as well so that it's still there when I
+ // move my linked list back to the spill files.
+ try {
+ DataOutputStream out = getSpillFile();
+ ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
+ Tuple t;
+ while ((t = readFromPriorityQ()) != null) {
+ t.write(out);
+ }
+ out.flush();
+ } catch (IOException ioe) {
+ String msg = "Unable to find our spill file.";
+ log.fatal(msg, ioe);
+ throw new RuntimeException(msg, ioe);
+ }
+ }
+
+ // Now, move our new list back to the spill files array.
+ mSpillFiles = new ArrayList<File>(ll);
+ } finally {
+ // Reset mStreams and mMerge so that they'll be allocated
+ // properly for regular merging.
+ mStreams = null;
+ mMergeQ = null;
+ }
+ }
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Wed Oct 28 16:26:58 2009
@@ -208,17 +208,18 @@
// same tuples, regardless of order. Hopefully most of the
// time the size check above will prevent this.
// If either bag isn't already sorted, create a sorted bag out
- // of it so I can guarantee order.
+ // of it so I can guarantee order.
+ BagFactory factory = BagFactory.getInstance();
+
DataBag thisClone;
DataBag otherClone;
- thisClone = new SortedDataBag(null);
+ thisClone = factory.newSortedBag(null);
Iterator<Tuple> i = iterator();
while (i.hasNext()) thisClone.add(i.next());
- if (other instanceof SortedDataBag ||
- other instanceof DistinctDataBag) {
+ if (((DataBag) other).isSorted() || ((DataBag) other).isDistinct()) {
otherClone = bOther;
} else {
- otherClone = new SortedDataBag(null);
+ otherClone = factory.newSortedBag(null);
i = bOther.iterator();
while (i.hasNext()) otherClone.add(i.next());
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java Wed Oct 28 16:26:58 2009
@@ -24,6 +24,7 @@
import org.apache.pig.data.*;
import org.apache.pig.impl.util.Spillable;
+
/**
* This class will exercise the basic Pig data model and members. It tests for proper behavior in
* assigment and comparision, as well as function application.
@@ -75,6 +76,12 @@
}
}
+ protected void tearDown() throws Exception {
+ BagFactory.resetSelf();
+ System.clearProperty("pig.data.bag.factory.name");
+ System.clearProperty("pig.data.bag.factory.jar");
+ }
+
// Test reading and writing default from memory, no spills.
@Test
public void testDefaultInMemory() throws Exception {
@@ -672,14 +679,14 @@
@Test
public void testDefaultBagFactory() throws Exception {
BagFactory f = BagFactory.getInstance();
-
+
DataBag bag = f.newDefaultBag();
DataBag sorted = f.newSortedBag(null);
DataBag distinct = f.newDistinctBag();
assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
- assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
+ assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
}
@Test
@@ -792,6 +799,12 @@
}
public void testInternalCachedBag() throws Exception {
+ // check adding empty tuple
+ DataBag bg0 = new InternalCachedBag();
+ bg0.add(TupleFactory.getInstance().newTuple());
+ bg0.add(TupleFactory.getInstance().newTuple());
+ assertEquals(bg0.size(), 2);
+
// check equal of bags
DataBag bg1 = new InternalCachedBag(1, 0.5f);
assertEquals(bg1.size(), 0);
@@ -846,6 +859,229 @@
bg4.clear();
assertEquals(bg4.size(), 0);
}
+
+ public void testInternalSortedBag() throws Exception {
+
+ // check adding empty tuple
+ DataBag bg0 = new InternalSortedBag();
+ bg0.add(TupleFactory.getInstance().newTuple());
+ bg0.add(TupleFactory.getInstance().newTuple());
+ assertEquals(bg0.size(), 2);
+
+ // check equal of bags
+ DataBag bg1 = new InternalSortedBag();
+ assertEquals(bg1.size(), 0);
+
+ String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }};
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg1.add(Util.createTuple(tupleContents[i]));
+ }
+
+ // check size, and isSorted(), isDistinct()
+ assertEquals(bg1.size(), 3);
+ assertTrue(bg1.isSorted());
+ assertFalse(bg1.isDistinct());
+
+ tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
+ DataBag bg2 = new InternalSortedBag();
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg2.add(Util.createTuple(tupleContents[i]));
+ }
+ assertEquals(bg1, bg2);
+
+ Iterator<Tuple> iter = bg1.iterator();
+ iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+ iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
+ // check bag with data written to disk
+ DataBag bg3 = new InternalSortedBag(1, 0.0, null);
+ tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg3.add(Util.createTuple(tupleContents[i]));
+ }
+ assertEquals(bg1, bg3);
+
+ iter = bg3.iterator();
+ iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+ iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
+ // call iterator methods with irregular order
+ iter = bg3.iterator();
+ assertTrue(iter.hasNext());
+ assertTrue(iter.hasNext());
+
+ DataBag bg4 = new InternalSortedBag(1, 0.0, null);
+ bg4.add(iter.next());
+ bg4.add(iter.next());
+ assertTrue(iter.hasNext());
+ bg4.add(iter.next());
+ assertFalse(iter.hasNext());
+ assertFalse(iter.hasNext());
+ assertEquals(bg3, bg4);
+
+ // check clear
+ bg3.clear();
+ assertEquals(bg3.size(), 0);
+
+ // test with all data spill out
+ DataBag bg5 = new InternalSortedBag();
+ for(int j=0; j<3; j++) {
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg5.add(Util.createTuple(tupleContents[i]));
+ }
+ bg5.spill();
+ }
+
+ assertEquals(bg5.size(), 9);
+ iter = bg5.iterator();
+ for(int i=0; i<3; i++) {
+ iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+ }
+ for(int i=0; i<3; i++) {
+ iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+ }
+ for(int i=0; i<3; i++) {
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+ }
+
+ // test with most data spill out, with some data in memory
+ // and merge of spill files
+ DataBag bg6 = new InternalSortedBag();
+ for(int j=0; j<104; j++) {
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg6.add(Util.createTuple(tupleContents[i]));
+ }
+ if (j != 103) {
+ bg6.spill();
+ }
+ }
+
+ assertEquals(bg6.size(), 104*3);
+ iter = bg6.iterator();
+ for(int i=0; i<104; i++) {
+ iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+ }
+ for(int i=0; i<104; i++) {
+ iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+ }
+ for(int i=0; i<104; i++) {
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+ }
+
+ // check two implementation of sorted bag can compare correctly
+ DataBag bg7 = new SortedDataBag(null);
+ for(int j=0; j<104; j++) {
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg7.add(Util.createTuple(tupleContents[i]));
+ }
+ if (j != 103) {
+ bg7.spill();
+ }
+ }
+ assertEquals(bg6, bg7);
+ }
+
+ public void testInternalDistinctBag() throws Exception {
+ // check adding empty tuple
+ DataBag bg0 = new InternalDistinctBag();
+ bg0.add(TupleFactory.getInstance().newTuple());
+ bg0.add(TupleFactory.getInstance().newTuple());
+ assertEquals(bg0.size(), 1);
+
+ // check equal of bags
+ DataBag bg1 = new InternalDistinctBag();
+ assertEquals(bg1.size(), 0);
+
+ String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg1.add(Util.createTuple(tupleContents[i]));
+ }
+
+ // check size, and isSorted(), isDistinct()
+ assertEquals(bg1.size(), 3);
+ assertFalse(bg1.isSorted());
+ assertTrue(bg1.isDistinct());
+
+ tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} };
+ DataBag bg2 = new InternalDistinctBag();
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg2.add(Util.createTuple(tupleContents[i]));
+ }
+ assertEquals(bg1, bg2);
+
+ Iterator<Tuple> iter = bg1.iterator();
+ iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+ iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
+ // check bag with data written to disk
+ DataBag bg3 = new InternalDistinctBag(1, 0.0);
+ tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg3.add(Util.createTuple(tupleContents[i]));
+ }
+ assertEquals(bg2, bg3);
+ assertEquals(bg3.size(), 3);
+
+
+ // call iterator methods with irregular order
+ iter = bg3.iterator();
+ assertTrue(iter.hasNext());
+ assertTrue(iter.hasNext());
+
+ DataBag bg4 = new InternalDistinctBag(1, 0.0);
+ bg4.add(iter.next());
+ bg4.add(iter.next());
+ assertTrue(iter.hasNext());
+ bg4.add(iter.next());
+ assertFalse(iter.hasNext());
+ assertFalse(iter.hasNext());
+ assertEquals(bg3, bg4);
+
+ // check clear
+ bg3.clear();
+ assertEquals(bg3.size(), 0);
+
+ // test with all data spill out
+ DataBag bg5 = new InternalDistinctBag();
+ for(int j=0; j<3; j++) {
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg5.add(Util.createTuple(tupleContents[i]));
+ }
+ bg5.spill();
+ }
+
+ assertEquals(bg5.size(), 3);
+
+
+ // test with most data spill out, with some data in memory
+ // and merge of spill files
+ DataBag bg6 = new InternalDistinctBag();
+ for(int j=0; j<104; j++) {
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg6.add(Util.createTuple(tupleContents[i]));
+ }
+ if (j != 103) {
+ bg6.spill();
+ }
+ }
+
+ assertEquals(bg6.size(), 3);
+
+ // check two implementation of sorted bag can compare correctly
+ DataBag bg7 = new DistinctDataBag();
+ for(int j=0; j<104; j++) {
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg7.add(Util.createTuple(tupleContents[i]));
+ }
+ if (j != 103) {
+ bg7.spill();
+ }
+ }
+ assertEquals(bg6, bg7);
+ }
}