You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/12/08 00:18:47 UTC
svn commit: r1418553 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
src/org/apache/pig/impl/plan/Operator.java
Author: julien
Date: Fri Dec 7 23:18:46 2012
New Revision: 1418553
URL: http://svn.apache.org/viewvc?rev=1418553&view=rev
Log:
PIG-3084: Improve exceptions messages in POPackage
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/trunk/src/org/apache/pig/impl/plan/Operator.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1418553&r1=1418552&r2=1418553&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Dec 7 23:18:46 2012
@@ -58,6 +58,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3084: Improve exceptions messages in POPackage (julien)
+
PIG-3072: Pig job reporting negative progress (knoguchi via rohini)
PIG-3014: CurrentTime() UDF has undesirable characteristics (jcoveney via cheolsoo)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1418553&r1=1418552&r2=1418553&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Dec 7 23:18:46 2012
@@ -55,7 +55,7 @@ import org.apache.pig.pen.util.LineageTr
* This is last stage of processing co-group.
* This operator has a slightly different
* format than other operators in that, it
- * takes two things as input. The key being
+ * takes two things as input. The key being
* worked on and the iterator of bags that
* contain indexed tuples that just need to
* be packaged into their appropriate output
@@ -63,13 +63,13 @@ import org.apache.pig.pen.util.LineageTr
*/
public class POPackage extends PhysicalOperator {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
-
+
public static enum PackageType { GROUP, JOIN };
-
+
//The iterator of indexed Tuples
//that is typically provided by
//Hadoop
@@ -77,14 +77,14 @@ public class POPackage extends PhysicalO
//The key being worked on
Object key;
-
+
// marker to indicate if key is a tuple
protected boolean isKeyTuple = false;
// marker to indicate if the tuple key is compound in nature
protected boolean isKeyCompound = false;
// key as a Tuple object (if the key is a tuple)
protected Tuple keyAsTuple;
-
+
//key's type
byte keyType;
@@ -92,33 +92,33 @@ public class POPackage extends PhysicalO
//co-group. 0 indicates a distinct, which means there will only be a
//key, no value.
int numInputs;
-
+
// If the attaching map-reduce plan use secondary sort key
boolean useSecondaryKey = false;
-
+
//Denotes if inner is specified
//on a particular input
boolean[] inner;
-
+
// flag to denote whether there is a distinct
// leading to this package
protected boolean distinct = false;
-
+
// A mapping of input index to key information got from LORearrange
// for that index. The Key information is a pair of boolean, Map.
- // The boolean indicates whether there is a lone project(*) in the
- // cogroup by. If not, the Map has a mapping of column numbers in the
+ // The boolean indicates whether there is a lone project(*) in the
+ // cogroup by. If not, the Map has a mapping of column numbers in the
// "value" to column numbers in the "key" which contain the fields in
// the "value"
protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
protected static final BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
-
+
private boolean firstTime = true;
-
+
private boolean useDefaultBag = false;
-
+
private PackageType pkgType;
public POPackage(OperatorKey k) {
@@ -161,7 +161,7 @@ public class POPackage extends PhysicalO
public boolean supportsMultipleOutputs() {
return false;
}
-
+
/**
* Attaches the required inputs
* @param k - the key being worked on
@@ -169,21 +169,22 @@ public class POPackage extends PhysicalO
* obtained from Hadoop
*/
public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
- tupIter = inp;
- key = k.getValueAsPigType();
- if (useSecondaryKey)
- {
- try {
+ try {
+ tupIter = inp;
+ key = k.getValueAsPigType();
+ if (useSecondaryKey) {
key = ((Tuple)key).get(0);
- } catch (ExecException e) {
- // TODO Exception
- throw new RuntimeException(e);
+
}
- }
- if(isKeyTuple) {
- // key is a tuple, cache the key as a
- // tuple for use in the getNext()
- keyAsTuple = (Tuple)key;
+ if(isKeyTuple) {
+ // key is a tuple, cache the key as a
+ // tuple for use in the getNext()
+ keyAsTuple = (Tuple)key;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Error attaching input for key " + k +
+ " in " + name() + " at location " + getOriginalLocations(), e);
}
}
@@ -202,7 +203,7 @@ public class POPackage extends PhysicalO
public void setNumInps(int numInps) {
this.numInputs = numInps;
}
-
+
public boolean[] getInner() {
return inner;
}
@@ -210,7 +211,7 @@ public class POPackage extends PhysicalO
public void setInner(boolean[] inner) {
this.inner = inner;
}
-
+
/**
* From the inputs, constructs the output tuple
* for this co-group in the required format which
@@ -219,7 +220,7 @@ public class POPackage extends PhysicalO
@Override
public Result getNext(Tuple t) throws ExecException {
Tuple res;
-
+
if(firstTime){
firstTime = false;
if (PigMapReduce.sJobConfInternal.get() != null) {
@@ -229,17 +230,17 @@ public class POPackage extends PhysicalO
}
}
}
-
+
if(distinct) {
// only set the key which has the whole
- // tuple
+ // tuple
res = mTupleFactory.newTuple(1);
res.set(0, key);
} else {
//Create numInputs bags
DataBag[] dbs = null;
dbs = new DataBag[numInputs];
-
+
if (isAccumulative()) {
// create bag wrapper to pull tuples in many batches
// all bags have reference to the sample tuples buffer
@@ -248,30 +249,30 @@ public class POPackage extends PhysicalO
for (int i = 0; i < numInputs; i++) {
dbs[i] = new AccumulativeBag(buffer, i);
}
-
+
} else {
// create bag to pull all tuples out of iterator
for (int i = 0; i < numInputs; i++) {
dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
- // In a very rare case if there is a POStream after this
+ // In a very rare case if there is a POStream after this
// POPackage in the pipeline and is also blocking the pipeline;
// constructor argument should be 2 * numInputs. But for one obscure
- // case we don't want to pay the penalty all the time.
- : new InternalCachedBag(numInputs);
- }
+ // case we don't want to pay the penalty all the time.
+ : new InternalCachedBag(numInputs);
+ }
//For each indexed tup in the inp, sort them
//into their corresponding bags based
//on the index
while (tupIter.hasNext()) {
NullableTuple ntup = tupIter.next();
int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
-
+ Tuple copy = getValueTuple(ntup, index);
+
if (numInputs == 1) {
-
- // this is for multi-query merge where
+
+ // this is for multi-query merge where
// the numInputs is always 1, but the index
- // (the position of the inner plan in the
+ // (the position of the inner plan in the
// enclosed operator) may not be 1.
dbs[0].add(copy);
} else {
@@ -280,7 +281,7 @@ public class POPackage extends PhysicalO
if(reporter!=null) reporter.progress();
}
}
-
+
//Construct the output tuple by appending
//the key and all the above constructed bags
//and return it.
@@ -296,8 +297,8 @@ public class POPackage extends PhysicalO
r.returnStatus = POStatus.STATUS_NULL;
return r;
}
- }
-
+ }
+
res.set(i+1,bag);
}
}
@@ -315,26 +316,26 @@ public class POPackage extends PhysicalO
// Need to make a copy of the value, as hadoop uses the same ntup
// to represent each value.
Tuple val = (Tuple)ntup.getValueAsPigType();
-
+
Tuple copy = null;
// The "value (val)" that we just got may not
// be the complete "value". It may have some portions
// in the "key" (look in POLocalRearrange for more comments)
// If this is the case we need to stitch
- // the "value" together.
- Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
- keyInfo.get(index);
+ // the "value" together.
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+ keyInfo.get(index);
boolean isProjectStar = lrKeyInfo.first;
Map<Integer, Integer> keyLookup = lrKeyInfo.second;
int keyLookupSize = keyLookup.size();
if( keyLookupSize > 0) {
-
+
// we have some fields of the "value" in the
// "key".
int finalValueSize = keyLookupSize + val.size();
copy = mTupleFactory.newTuple(finalValueSize);
- int valIndex = 0; // an index for accessing elements from
+ int valIndex = 0; // an index for accessing elements from
// the value (val) that we have currently
for(int i = 0; i < finalValueSize; i++) {
Integer keyIndex = keyLookup.get(i);
@@ -357,12 +358,12 @@ public class POPackage extends PhysicalO
}
copy = illustratorMarkup2(val, copy);
} else if (isProjectStar) {
-
+
// the whole "value" is present in the "key"
copy = mTupleFactory.newTuple(keyAsTuple.getAll());
copy = illustratorMarkup2(keyAsTuple, copy);
} else {
-
+
// there is no field of the "value" in the
// "key" - so just make a copy of what we got
// as the "value"
@@ -371,7 +372,7 @@ public class POPackage extends PhysicalO
}
return copy;
}
-
+
public byte getKeyType() {
return keyType;
}
@@ -379,9 +380,9 @@ public class POPackage extends PhysicalO
public void setKeyType(byte keyType) {
this.keyType = keyType;
}
-
+
/**
- * Make a deep copy of this operator.
+ * Make a deep copy of this operator.
* @throws CloneNotSupportedException
*/
@Override
@@ -417,7 +418,7 @@ public class POPackage extends PhysicalO
public void setKeyTuple(boolean keyTuple) {
this.isKeyTuple = keyTuple;
}
-
+
/**
* @param keyCompound the keyCompound to set
*/
@@ -445,7 +446,7 @@ public class POPackage extends PhysicalO
public void setDistinct(boolean distinct) {
this.distinct = distinct;
}
-
+
public void setUseSecondaryKey(boolean useSecondaryKey) {
this.useSecondaryKey = useSecondaryKey;
}
@@ -453,11 +454,11 @@ public class POPackage extends PhysicalO
public void setPackageType(PackageType type) {
this.pkgType = type;
}
-
+
public PackageType getPackageType() {
return pkgType;
}
-
+
class POPackageTupleBuffer implements AccumulativeTupleBuffer {
private List<Tuple>[] bags;
private Iterator<NullableTuple> iter;
@@ -465,15 +466,15 @@ public class POPackage extends PhysicalO
private Object currKey;
@SuppressWarnings("unchecked")
- public POPackageTupleBuffer() {
+ public POPackageTupleBuffer() {
batchSize = 20000;
if (PigMapReduce.sJobConfInternal.get() != null) {
String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize");
if (size != null) {
batchSize = Integer.parseInt(size);
}
- }
-
+ }
+
this.bags = new List[numInputs];
for(int i=0; i<numInputs; i++) {
this.bags[i] = new ArrayList<Tuple>();
@@ -481,7 +482,7 @@ public class POPackage extends PhysicalO
this.iter = tupIter;
this.currKey = key;
}
-
+
@Override
public boolean hasNextBatch() {
return iter.hasNext();
@@ -492,18 +493,18 @@ public class POPackage extends PhysicalO
for(int i=0; i<bags.length; i++) {
bags[i].clear();
}
-
- key = currKey;
+
+ key = currKey;
for(int i=0; i<batchSize; i++) {
if (iter.hasNext()) {
NullableTuple ntup = iter.next();
int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
+ Tuple copy = getValueTuple(ntup, index);
if (numInputs == 1) {
-
- // this is for multi-query merge where
+
+ // this is for multi-query merge where
// the numInputs is always 1, but the index
- // (the position of the inner plan in the
+ // (the position of the inner plan in the
// enclosed operator) may not be 1.
bags[0].add(copy);
} else {
@@ -513,24 +514,24 @@ public class POPackage extends PhysicalO
break;
}
}
- }
-
+ }
+
public void clear() {
for(int i=0; i<bags.length; i++) {
bags[i].clear();
}
iter = null;
}
-
- public Iterator<Tuple> getTuples(int index) {
+
+ public Iterator<Tuple> getTuples(int index) {
return bags[index].iterator();
}
-
+
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
}
};
-
+
private Tuple illustratorMarkup2(Object in, Object out) {
if(illustrator != null) {
ExampleTuple tOut = new ExampleTuple((Tuple) out);
@@ -541,7 +542,7 @@ public class POPackage extends PhysicalO
} else
return (Tuple) out;
}
-
+
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
@@ -600,4 +601,5 @@ public class POPackage extends PhysicalO
} else
return (Tuple) out;
}
+
}
Modified: pig/trunk/src/org/apache/pig/impl/plan/Operator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/Operator.java?rev=1418553&r1=1418552&r2=1418553&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/Operator.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/Operator.java Fri Dec 7 23:18:46 2012
@@ -82,11 +82,7 @@ abstract public class Operator<V extends
@Override
public String toString() {
- StringBuilder msg = new StringBuilder();
-
- msg.append("(Name: " + name() + " Operator Key: " + mKey + ")");
-
- return msg.toString();
+ return "(Name: " + name() + " Operator Key: " + mKey + ")";
}
/**