You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ha...@apache.org on 2010/02/08 20:19:38 UTC
svn commit: r907760 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
Author: hashutosh
Date: Mon Feb 8 19:19:37 2010
New Revision: 907760
URL: http://svn.apache.org/viewvc?rev=907760&view=rev
Log:
PIG-1224: Collected group should change to use new (internal) bag
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=907760&r1=907759&r2=907760&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Feb 8 19:19:37 2010
@@ -24,6 +24,8 @@
IMPROVEMENTS
+PIG-1224: Collected group should change to use new (internal) bag (ashutoshc)
+
PIG-1046: join algorithm specification is within double quotes (ashutoshc)
PIG-1209: Port POJoinPackage to proactively spill (ashutoshc)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=907760&r1=907759&r2=907760&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Mon Feb 8 19:19:37 2010
@@ -22,6 +22,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -31,6 +32,7 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -69,6 +71,8 @@
private Object prevKey = null;
+ private boolean useDefaultBag = false;
+
public POCollectedGroup(OperatorKey k) {
this(k, -1, null);
}
@@ -199,8 +203,24 @@
// the first time, just create a new buffer and continue.
if (prevKey == null && outputBag == null) {
+
+ if (PigMapReduce.sJobConf != null) {
+ String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
prevKey = curKey;
- outputBag = BagFactory.getInstance().newDefaultBag();
+ outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
+ // In a very rare case if there is a POStream after this
+ // POCollectedGroup in the pipeline and is also blocking the pipeline;
+ // constructor argument should be 2. But for one obscure
+ // case we don't want to pay the penalty all the time.
+
+ // Additionally, if there is a merge join(on a different key) following POCollectedGroup
+ // default bags should be used. But since we don't allow anything
+ // before Merge Join currently we are good.
+ : new InternalCachedBag(1);
outputBag.add((Tuple)tup.get(1));
continue;
}
@@ -224,7 +244,8 @@
res.result = tup2;
prevKey = curKey;
- outputBag = BagFactory.getInstance().newDefaultBag();
+ outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
+ : new InternalCachedBag(1);
outputBag.add((Tuple)tup.get(1));
return res;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=907760&r1=907759&r2=907760&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java Mon Feb 8 19:19:37 2010
@@ -25,6 +25,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalCachedBag;
@@ -44,6 +45,8 @@
private boolean lastInputTuple = false;
private static final Tuple t1 = null;
private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
+ private boolean firstTime = true;
+ private boolean useDefaultBag = false;
public static final String DEFAULT_CHUNK_SIZE = "1000";
@@ -100,6 +103,16 @@
*/
@Override
public Result getNext(Tuple t) throws ExecException {
+
+ if(firstTime){
+ firstTime = false;
+ if (PigMapReduce.sJobConf != null) {
+ String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
+ }
// if a previous call to foreach.getNext()
// has still not returned all output, process it
if (forEach.processingPlan)
@@ -126,17 +139,14 @@
{
lastInputTuple = false;
//Put n-1 inputs into bags
- String bagType = null;
- if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
- }
dbs = new DataBag[numInputs];
for (int i = 0; i < numInputs; i++) {
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- dbs[i] = mBagFactory.newDefaultBag();
- } else {
- dbs[i] = new InternalCachedBag(numInputs);
- }
+ dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
+ // In a very rare case if there is a POStream after this
+ // POJoinPackage in the pipeline and is also blocking the pipeline;
+ // constructor argument should be 2 * numInputs. But for one obscure
+ // case we don't want to pay the penalty all the time.
+ : new InternalCachedBag(numInputs);
}
//For each Nullable tuple in the input, put it
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=907760&r1=907759&r2=907760&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon Feb 8 19:19:37 2010
@@ -115,6 +115,10 @@
protected static final BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ private boolean firstTime = true;
+
+ private boolean useDefaultBag = false;
public POPackage(OperatorKey k) {
this(k, -1, null);
@@ -211,6 +215,17 @@
@Override
public Result getNext(Tuple t) throws ExecException {
Tuple res;
+
+ if(firstTime){
+ firstTime = false;
+ if (PigMapReduce.sJobConf != null) {
+ String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
+ }
+
if(distinct) {
// only set the key which has the whole
// tuple
@@ -232,20 +247,14 @@
} else {
// create bag to pull all tuples out of iterator
- String bagType = null;
- if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
- }
-
-
- for (int i = 0; i < numInputs; i++) {
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- dbs[i] = mBagFactory.newDefaultBag();
- } else {
- dbs[i] = new InternalCachedBag(numInputs);
- }
- }
-
+ for (int i = 0; i < numInputs; i++) {
+ dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
+ // In a very rare case if there is a POStream after this
+ // POPackage in the pipeline and is also blocking the pipeline;
+ // constructor argument should be 2 * numInputs. But for one obscure
+ // case we don't want to pay the penalty all the time.
+ : new InternalCachedBag(numInputs);
+ }
//For each indexed tup in the inp, sort them
//into their corresponding bags based
//on the index