You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/01/27 22:57:22 UTC
svn commit: r738263 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/backend/hadoop/executionengine/phys...
Author: pradeepkth
Date: Tue Jan 27 21:57:21 2009
New Revision: 738263
URL: http://svn.apache.org/viewvc?rev=738263&view=rev
Log:
PIG-636: PERFORMANCE: Use lightweight bag implementations which do not register with SpillableMemoryManager with Combiner
Added:
hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan 27 21:57:21 2009
@@ -193,17 +193,17 @@
PIG-422: cross is broken (shravanmn via olgan)
- PIG-407: need to clone operators (pradeepk via olgan)
+ PIG-407: need to clone operators (pradeepkth via olgan)
PIG-428: TypeCastInserter does not replace projects in inner plans
- correctly (pradeepk vi olgan)
+ correctly (pradeepkth vi olgan)
PIG-421: error with complex nested plan (sms via olgan)
PIG-429: Self join wth implicit split has the join output in wrong order
- (pradeepk via olgan)
+ (pradeepkth via olgan)
- PIG-434: short-circuit AND and OR (pradeepk viia olgan)
+ PIG-434: short-circuit AND and OR (pradeepkth viia olgan)
PIG-333: allowing no parethesis with single column alias with flatten (sms
via olgan)
@@ -212,15 +212,15 @@
PIG-426: Adding result of two UDFs gives a syntax error (sms via olgan)
- PIG-436: alias is lost when single column is flattened (pradeepk via
+ PIG-436: alias is lost when single column is flattened (pradeepkth via
olgan)
PIG-364: Limit return incorrect records when we use multiple reducer
(daijy via olgan)
- PIG-439: disallow alias renaming (pardeepk via olgan)
+ PIG-439: disallow alias renaming (pradeepkth via olgan)
- PIG-440: Exceptions from UDFs inside a foreach are not captured (pradeepk
+ PIG-440: Exceptions from UDFs inside a foreach are not captured (pradeepkth
via olgan)
PIG-442: Disambiguated alias after a foreach flatten is not accessible a
@@ -238,18 +238,18 @@
PIG-445: Null Pointer Exceptions in the mappers leading to lot of retries
(shravanmn via olgan)
- PIG-444: job.jar is left behined (pradeepk via olgan)
+ PIG-444: job.jar is left behined (pradeepkth via olgan)
- PIG-447: improved error messages (pradeepk via olgan)
+ PIG-447: improved error messages (pradeepkth via olgan)
- PIG-448: explain broken after load with types (pradeepk via olgan)
+ PIG-448: explain broken after load with types (pradeepkth via olgan)
PIG-380: invalid schema for databag constant (sms via olgan)
PIG-451: If an field is part of group followed by flatten, then referring
- to it causes a parse error (pradeepk via olgan)
+ to it causes a parse error (pradeepkth via olgan)
- PIG-455: "group" alias is lost after a flatten(group) (pradeepk vi olgan)
+ PIG-455: "group" alias is lost after a flatten(group) (pradeepkth vi olgan)
PIG-458: integration with Hadoop 18 (olgan)
@@ -262,34 +262,34 @@
PIG-376: set job name (olgan)
- PIG-463: POCast changes (pradeepk via olgan)
+ PIG-463: POCast changes (pradeepkth via olgan)
PIG-427: casting input to UDFs
PIG-437: as in alias names causing problems (sms via olgan)
- PIG-54: MIN/MAX don't deal with invalid data (pradeepk via olgan)
+ PIG-54: MIN/MAX don't deal with invalid data (pradeepkth via olgan)
PIG-470: TextLoader should produce bytearrays (sms via olgan)
PIG-335: lineage (sms vi olgan)
- PIG-464: bag schema definition (pradeepk via olgan)
+ PIG-464: bag schema definition (pradeepkth via olgan)
PIG-457: report 100% on successful jobs only (shravanmn via olgan)
- PIG-471: ignoring status errors from hadoop (pradeepk via olgan)
+ PIG-471: ignoring status errors from hadoop (pradeepkth via olgan)
- PIG-465: performance improvement - removing keys from the value (pradeepk
+ PIG-465: performance improvement - removing keys from the value (pradeepkth
via olgan)
PIG-489: (*) processing (sms via olgan)
PIG-475: missing heartbeats (shravanmn via olgan)
- PIG-468: make determine Schema work for BinStorage (pradeepk via olgan)
+ PIG-468: make determine Schema work for BinStorage (pradeepkth via olgan)
- PIG-494: invalid handling of UTF-8 data in PigStorage (pradeepk via olgan)
+ PIG-494: invalid handling of UTF-8 data in PigStorage (pradeepkth via olgan)
PIG-501: Make branches/types work under cygwin (daijy via olgan)
@@ -304,15 +304,15 @@
PIG-499: parser issue with as (sms via olgan)
- PIG-507: permission error not reported (pradeepk via olgan)
+ PIG-507: permission error not reported (pradeepkth via olgan)
- PIG-508: problem with double joins (pradeepk via olgan)
+ PIG-508: problem with double joins (pradeepkth via olgan)
- PIG-497: problems with UTF8 handling in BinStorage (pradeepk via olgan)
+ PIG-497: problems with UTF8 handling in BinStorage (pradeepkth via olgan)
PIG-505: working with map elements (sms via olgan)
- PIG-517: load functiin with parameters does not work with cast (pradeepk
+ PIG-517: load functiin with parameters does not work with cast (pradeepkth
via olgan)
PIG-525: make sure cast for udf parameters works (olgan)
@@ -324,16 +324,16 @@
PIG-527: allow PigStorage to write out complex output (sms via olgan)
PIG-537: Failure in Hadoop map collect stage due to type mismatch in the
- keys used in cogroup (pradeepk vi olgan)
+ keys used in cogroup (pradeepkth vi olgan)
- PIG-538: support for null constants (pradeepk via olgan)
+ PIG-538: support for null constants (pradeepkth via olgan)
- PIG-385: more null handling (pradeepk via olgan)
+ PIG-385: more null handling (pradeepkth via olgan)
PIG-546: FilterFunc calls empty constructor when it should be calling
parameterized constructor (sms via olgan)
- PIG-449: Schemas for bags should contain tuples all the time (pradeepk via
+ PIG-449: Schemas for bags should contain tuples all the time (pradeepkth via
olgan)
PIG-501: make unit tests run under windows (daijy via olgan)
@@ -346,14 +346,14 @@
PIG-6: Add load support from hbase (hustlmsp via gates).
- PIG-522: make negation work (pradeepk via olgan)
+ PIG-522: make negation work (pradeepkth via olgan)
- PIG-563: support for multiple combiner invocations (pradeepk via olgan)
+ PIG-563: support for multiple combiner invocations (pradeepkth via olgan)
- PIG-580: using combiner to compute distinct aggs (pradeepk via olgan)
+ PIG-580: using combiner to compute distinct aggs (pradeepkth via olgan)
PIG-558: Distinct followed by a Join results in Invalid size 0 for a tuple
- error (pradeepk via olgan)
+ error (pradeepkth via olgan)
PIG-572 A PigServer.registerScript() method, which lets a client
programmatically register a Pig Script. (shubhamc via gates)
@@ -364,14 +364,14 @@
PIG-597: Fix for how * is treated by UDFs (shravanmn via olgan)
- PIG-629: performance improvement: getting rid of targeted tuple (pradeepk
+ PIG-629: performance improvement: getting rid of targeted tuple (pradeepkth
via olgan)
PIG-623: Fix spelling errors in output messages (tomwhite via sms)
PIG-622: Include pig executable in distribution (tomwhite via sms)
- PIG-628: misc performance improvements (pradeepk via olgan)
+ PIG-628: misc performance improvements (pradeepkth via olgan)
PIG-589: error handling, phase 1-2 (sms via olgan)
@@ -380,6 +380,9 @@
PIG-635: POCast.java has incorrect formatting (sms)
PIG-634: When POUnion is one of the roots of a map plan, POUnion.getNext()
- gives a null pointer exception (pradeepk)
+ gives a null pointer exception (pradeepkth)
PIG-632: Improved error message for binary operators (sms)
+
+ PIG-636: Performance improvement: Use lightweight bag implementations which do not
+ register with SpillableMemoryManager with Combiner (pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue Jan 27 21:57:21 2009
@@ -466,6 +466,18 @@
}
}
+
+ // since we will only be creating SingleTupleBag as input to
+ // the map foreach, we should flag the POProjects in the map
+ // foreach inner plans to also use SingleTupleBag
+ for (PhysicalPlan mpl : mPlans) {
+ try {
+ new fixMapProjects(mpl).visit();
+ } catch (VisitorException e) {
+ throw new PlanException(e);
+ }
+ }
+
// Set flattens for map and combiner ForEach to false
List<Boolean> feFlattens = new ArrayList<Boolean>(cPlans.size());
@@ -823,7 +835,47 @@
}
}
+
+ private class fixMapProjects extends PhyPlanVisitor {
+
+ public fixMapProjects(PhysicalPlan plan) {
+ this(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+ plan));
+ }
+ /**
+ * @param plan
+ * @param walker
+ */
+ public fixMapProjects(PhysicalPlan plan,
+ PlanWalker<PhysicalOperator, PhysicalPlan> walker) {
+ super(plan, walker);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
+ */
+ @Override
+ public void visitProject(POProject proj) throws VisitorException {
+ if (proj.getResultType() == DataType.BAG) {
+
+ // IMPORTANT ASSUMPTION:
+ // we should be calling this visitor only for
+ // fixing up the projects in the map's foreach
+ // inner plan. In the map side, we are dealing
+ // with single tuple bags - so set the flag in
+ // the project to use single tuple bags. If in
+ // future we don't have single tuple bags in the
+ // input to map's foreach, we should NOT be doing
+ // this!
+ proj.setResultSingleTupleBag(true);
+
+ }
+ }
+
+ }
// Reset any member variables since we may have already visited one
// combine.
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Tue Jan 27 21:57:21 2009
@@ -26,6 +26,7 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.SingleTupleBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
@@ -54,6 +55,8 @@
private static TupleFactory tupleFactory = TupleFactory.getInstance();
+ private boolean resultSingleTupleBag = false;
+
//The column to project
ArrayList<Integer> columns;
@@ -168,6 +171,7 @@
@Override
public Result getNext(DataBag db) throws ExecException {
+
Result res = processInputBag();
if(res.returnStatus!=POStatus.STATUS_OK)
return res;
@@ -179,12 +183,25 @@
detachInput();
return res;
}
- DataBag outBag = BagFactory.getInstance().newDefaultBag();
- for (Tuple tuple : inpBag) {
+
+ DataBag outBag;
+ if(resultSingleTupleBag) {
+ // we have only one tuple in a bag - so create
+ // A SingleTupleBag for the result and fill it
+ // appropriately from the input bag
+ Tuple tuple = inpBag.iterator().next();
Tuple tmpTuple = tupleFactory.newTuple(columns.size());
for (int i = 0; i < columns.size(); i++)
tmpTuple.set(i, tuple.get(columns.get(i)));
- outBag.add(tmpTuple);
+ outBag = new SingleTupleBag(tmpTuple);
+ } else {
+ outBag = BagFactory.getInstance().newDefaultBag();
+ for (Tuple tuple : inpBag) {
+ Tuple tmpTuple = tupleFactory.newTuple(columns.size());
+ for (int i = 0; i < columns.size(); i++)
+ tmpTuple.set(i, tuple.get(columns.get(i)));
+ outBag.add(tmpTuple);
+ }
}
res.result = outBag;
res.returnStatus = POStatus.STATUS_OK;
@@ -371,4 +388,8 @@
}
}
+ public void setResultSingleTupleBag(boolean resultSingleTupleBag) {
+ this.resultSingleTupleBag = resultSingleTupleBag;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Tue Jan 27 21:57:21 2009
@@ -31,6 +31,7 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
@@ -109,7 +110,7 @@
//Create numInputs bags
Object[] fields = new Object[mBags.length];
for (int i = 0; i < mBags.length; i++) {
- if (mBags[i]) fields[i] = mBagFactory.newDefaultBag();
+ if (mBags[i]) fields[i] = new NonSpillableDataBag();
}
// For each indexed tup in the inp, split them up and place their
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Tue Jan 27 21:57:21 2009
@@ -25,6 +25,7 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.SingleTupleBag;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -165,7 +166,6 @@
resLst.add(res);
}
res.result = constructLROutput(resLst,(Tuple)inp.result);
-
return res;
}
return inp;
@@ -190,8 +190,7 @@
// put the value in a bag so that the initial
// version of the Algebraics will get a bag as
// they would expect.
- DataBag bg = mBagFactory.newDefaultBag();
- bg.add(value);
+ DataBag bg = new SingleTupleBag(value);
output.set(1, bg);
return output;
}
Added: hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=738263&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Tue Jan 27 21:57:21 2009
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+
+/**
+ * An unordered collection of Tuples (possibly) with multiples. The tuples
+ * are stored in an ArrayList, since there is no concern for order or
+ * distinctness. The implicit assumption is that the user of this class
+ * is storing only those many tuples as will fit in memory - no spilling
+ * will be done on this bag to disk.
+ */
+public class NonSpillableDataBag implements DataBag {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ *
+ */
+ private List<Tuple> mContents;
+
+ public NonSpillableDataBag() {
+ mContents = new ArrayList<Tuple>();
+ }
+
+ /**
+ * This constructor creates a bag out of an existing list
+ * of tuples by taking ownership of the list and NOT
+ * copying the contents of the list.
+ * @param listOfTuples List<Tuple> containing the tuples
+ */
+ public NonSpillableDataBag(List<Tuple> listOfTuples) {
+ mContents = listOfTuples;
+ }
+
+ public boolean isSorted() {
+ return false;
+ }
+
+ public boolean isDistinct() {
+ return false;
+ }
+
+ public Iterator<Tuple> iterator() {
+ return new NonSpillableDataBagIterator();
+ }
+
+ /**
+ * An iterator that handles getting the next tuple from the bag.
+ */
+ private class NonSpillableDataBagIterator implements Iterator<Tuple> {
+
+ private int mCntr = 0;
+
+ public boolean hasNext() {
+ return (mCntr < mContents.size());
+ }
+
+ public Tuple next() {
+ // This will report progress every 1024 times through next.
+ // This should be much faster than using mod.
+ if ((mCntr & 0x3ff) == 0) reportProgress();
+
+ return mContents.get(mCntr++);
+ }
+
+ /**
+ * Not implemented.
+ */
+ public void remove() { throw new RuntimeException("Cannot remove() from NonSpillableDataBag.iterator()");}
+ }
+
+ /**
+ * Report progress to HDFS.
+ */
+ protected void reportProgress() {
+ if (PhysicalOperator.reporter != null) {
+ PhysicalOperator.reporter.progress();
+ }
+ }
+
+ @Override
+ public void add(Tuple t) {
+ mContents.add(t);
+ }
+
+ @Override
+ public void addAll(DataBag b) {
+ for (Tuple t : b) {
+ mContents.add(t);
+ }
+ }
+
+ @Override
+ public void clear() {
+ mContents.clear();
+ }
+
+ @Override
+ public void markStale(boolean stale) {
+ throw new RuntimeException("NonSpillableDataBag cannot be marked stale");
+ }
+
+ @Override
+ public long size() {
+ return mContents.size();
+ }
+
+ @Override
+ public long getMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public long spill() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /**
+ * Write a bag's contents to disk.
+ * @param out DataOutput to write data to.
+ * @throws IOException (passes it on from underlying calls).
+ */
+ public void write(DataOutput out) throws IOException {
+ // We don't care whether this bag was sorted or distinct because
+ // using the iterator to write it will guarantee those things come
+ // correctly. And on the other end there'll be no reason to waste
+ // time re-sorting or re-applying distinct.
+ out.writeLong(size());
+ Iterator<Tuple> it = iterator();
+ while (it.hasNext()) {
+ Tuple item = it.next();
+ item.write(out);
+ }
+ }
+
+ /**
+ * Read a bag from disk.
+ * @param in DataInput to read data from.
+ * @throws IOException (passes it on from underlying calls).
+ */
+ public void readFields(DataInput in) throws IOException {
+ long size = in.readLong();
+
+ for (long i = 0; i < size; i++) {
+ try {
+ Object o = DataReaderWriter.readDatum(in);
+ add((Tuple)o);
+ } catch (ExecException ee) {
+ throw new RuntimeException(ee);
+ }
+ }
+ }
+
+ @Override
+ public int compareTo(Object other) {
+ if (this == other)
+ return 0;
+ if (other instanceof DataBag) {
+ DataBag bOther = (DataBag) other;
+ if (this.size() != bOther.size()) {
+ if (this.size() > bOther.size()) return 1;
+ else return -1;
+ }
+
+ Iterator<Tuple> thisIt = this.iterator();
+ Iterator<Tuple> otherIt = bOther.iterator();
+ while (thisIt.hasNext() && otherIt.hasNext()) {
+ Tuple thisT = thisIt.next();
+ Tuple otherT = otherIt.next();
+
+ int c = thisT.compareTo(otherT);
+ if (c != 0) return c;
+ }
+
+ return 0; // if we got this far, they must be equal
+ } else {
+ return DataType.compare(this, other);
+ }
+ }
+
+ /**
+ * Write the bag into a string. */
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append('{');
+ Iterator<Tuple> it = iterator();
+ while ( it.hasNext() ) {
+ Tuple t = it.next();
+ String s = t.toString();
+ sb.append(s);
+ if (it.hasNext()) sb.append(",");
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+}
+
Added: hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java?rev=738263&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java Tue Jan 27 21:57:21 2009
@@ -0,0 +1,170 @@
+/**
+ *
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A simple performant implementation of the DataBag
+ * interface which only holds a single tuple. This will
+ * be used from POPreCombinerLocalRearrange and wherever else
+ * a single Tuple non-serializable DataBag is required.
+ */
+public class SingleTupleBag implements DataBag {
+
+ Tuple item;
+
+ public SingleTupleBag(Tuple t) {
+ item = t;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#add(org.apache.pig.data.Tuple)
+ * NOTE: It is the user's responsibility to ensure only a single
+ * Tuple is ever added into a SingleTupleBag
+ */
+ @Override
+ public void add(Tuple t) {
+ item = t;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#addAll(org.apache.pig.data.DataBag)
+ */
+ @Override
+ public void addAll(DataBag b) {
+ throw new RuntimeException("Cannot create SingleTupleBag from another DataBag");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#clear()
+ */
+ @Override
+ public void clear() {
+ throw new RuntimeException("Cannot clear SingleTupleBag");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#isDistinct()
+ */
+ @Override
+ public boolean isDistinct() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#isSorted()
+ */
+ @Override
+ public boolean isSorted() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#iterator()
+ */
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new TBIterator();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#markStale(boolean)
+ */
+ @Override
+ public void markStale(boolean stale) {
+ throw new RuntimeException("SingleTupleBag cannot be marked stale");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.DataBag#size()
+ */
+ @Override
+ public long size() {
+ return 1;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.util.Spillable#getMemorySize()
+ */
+ @Override
+ public long getMemorySize() {
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.util.Spillable#spill()
+ */
+ @Override
+ public long spill() {
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // TODO Auto-generated method stub
+ throw new IOException("SingleTupleBag should never be serialized or serialized");
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODO Auto-generated method stub
+ throw new IOException("SingleTupleBag should never be serialized or deserialized");
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ @Override
+ public int compareTo(Object o) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ class TBIterator implements Iterator<Tuple> {
+ boolean nextDone = false;
+ /* (non-Javadoc)
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext() {
+ return !nextDone;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ @Override
+ public Tuple next() {
+ nextDone = true;
+ return item;
+
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#remove()
+ */
+ @Override
+ public void remove() {
+ throw new RuntimeException("SingleTupleBag.iterator().remove() is not allowed");
+ }
+ }
+
+ /**
+ * Write the bag into a string. */
+ @Override
+ public String toString() {
+ return "{" + item + "}";
+ }
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java Tue Jan 27 21:57:21 2009
@@ -17,6 +17,9 @@
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.SingleTupleBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.MultiMap;
import org.junit.Before;
@@ -39,6 +42,35 @@
}
@Test
+ public void testSingleTupleBagAcess() throws Exception {
+ Tuple inputTuple = new DefaultTuple();
+ inputTuple.append("a");
+ inputTuple.append("b");
+
+ SingleTupleBag bg = new SingleTupleBag(inputTuple);
+ Iterator<Tuple> it = bg.iterator();
+ assertEquals(inputTuple, it.next());
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ public void testNonSpillableDataBag() throws Exception {
+ String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
+ NonSpillableDataBag bg = new NonSpillableDataBag();
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg.add(Util.createTuple(tupleContents[i]));
+ }
+ Iterator<Tuple> it = bg.iterator();
+ int j = 0;
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ assertEquals(Util.createTuple(tupleContents[j]), t);
+ j++;
+ }
+ assertEquals(tupleContents.length, j);
+ }
+
+ @Test
public void testBagConstantAccess() throws IOException, ExecException {
File input = Util.createInputFile("tmp", "",
new String[] {"sampledata\tnot_used"});