You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/12/08 00:18:47 UTC

svn commit: r1418553 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java src/org/apache/pig/impl/plan/Operator.java

Author: julien
Date: Fri Dec  7 23:18:46 2012
New Revision: 1418553

URL: http://svn.apache.org/viewvc?rev=1418553&view=rev
Log:
PIG-3084: Improve exceptions messages in POPackage

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/trunk/src/org/apache/pig/impl/plan/Operator.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1418553&r1=1418552&r2=1418553&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Dec  7 23:18:46 2012
@@ -58,6 +58,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3084: Improve exceptions messages in POPackage (julien)
+
 PIG-3072: Pig job reporting negative progress (knoguchi via rohini)
 
 PIG-3014: CurrentTime() UDF has undesirable characteristics (jcoveney via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1418553&r1=1418552&r2=1418553&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Dec  7 23:18:46 2012
@@ -55,7 +55,7 @@ import org.apache.pig.pen.util.LineageTr
  * This is last stage of processing co-group.
  * This operator has a slightly different
  * format than other operators in that, it
- * takes two things as input. The key being 
+ * takes two things as input. The key being
  * worked on and the iterator of bags that
  * contain indexed tuples that just need to
  * be packaged into their appropriate output
@@ -63,13 +63,13 @@ import org.apache.pig.pen.util.LineageTr
  */
 public class POPackage extends PhysicalOperator {
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
-    
+
 
     public static enum PackageType { GROUP, JOIN };
-    
+
     //The iterator of indexed Tuples
     //that is typically provided by
     //Hadoop
@@ -77,14 +77,14 @@ public class POPackage extends PhysicalO
 
     //The key being worked on
     Object key;
-    
+
     // marker to indicate if key is a tuple
     protected boolean isKeyTuple = false;
     // marker to indicate if the tuple key is compound in nature
     protected boolean isKeyCompound = false;
     // key as a Tuple object (if the key is a tuple)
     protected Tuple keyAsTuple;
-    
+
     //key's type
     byte keyType;
 
@@ -92,33 +92,33 @@ public class POPackage extends PhysicalO
     //co-group.  0 indicates a distinct, which means there will only be a
     //key, no value.
     int numInputs;
-    
+
     // If the attaching map-reduce plan use secondary sort key
     boolean useSecondaryKey = false;
-    
+
     //Denotes if inner is specified
     //on a particular input
     boolean[] inner;
-    
+
     // flag to denote whether there is a distinct
     // leading to this package
     protected boolean distinct = false;
-    
+
     // A mapping of input index to key information got from LORearrange
     // for that index. The Key information is a pair of boolean, Map.
-    // The boolean indicates whether there is a lone project(*) in the 
-    // cogroup by. If not, the Map has a mapping of column numbers in the 
+    // The boolean indicates whether there is a lone project(*) in the
+    // cogroup by. If not, the Map has a mapping of column numbers in the
     // "value" to column numbers in the "key" which contain the fields in
     // the "value"
     protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
 
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
-    
+
     private boolean firstTime = true;
-    
+
     private boolean useDefaultBag = false;
-    
+
     private PackageType pkgType;
 
     public POPackage(OperatorKey k) {
@@ -161,7 +161,7 @@ public class POPackage extends PhysicalO
     public boolean supportsMultipleOutputs() {
         return false;
     }
-    
+
     /**
      * Attaches the required inputs
      * @param k - the key being worked on
@@ -169,21 +169,22 @@ public class POPackage extends PhysicalO
      *              obtained from Hadoop
      */
     public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
-        tupIter = inp;
-        key = k.getValueAsPigType();
-        if (useSecondaryKey)
-        {
-            try {
+        try {
+            tupIter = inp;
+            key = k.getValueAsPigType();
+            if (useSecondaryKey) {
                 key = ((Tuple)key).get(0);
-            } catch (ExecException e) {
-                // TODO Exception
-                throw new RuntimeException(e);
+
             }
-        }
-        if(isKeyTuple) {
-            // key is a tuple, cache the key as a
-            // tuple for use in the getNext()
-            keyAsTuple = (Tuple)key;
+            if(isKeyTuple) {
+                // key is a tuple, cache the key as a
+                // tuple for use in the getNext()
+                keyAsTuple = (Tuple)key;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Error attaching input for key " + k +
+                    " in " + name() + " at location " + getOriginalLocations(), e);
         }
     }
 
@@ -202,7 +203,7 @@ public class POPackage extends PhysicalO
     public void setNumInps(int numInps) {
         this.numInputs = numInps;
     }
-    
+
     public boolean[] getInner() {
         return inner;
     }
@@ -210,7 +211,7 @@ public class POPackage extends PhysicalO
     public void setInner(boolean[] inner) {
         this.inner = inner;
     }
-   
+
     /**
      * From the inputs, constructs the output tuple
      * for this co-group in the required format which
@@ -219,7 +220,7 @@ public class POPackage extends PhysicalO
     @Override
     public Result getNext(Tuple t) throws ExecException {
         Tuple res;
-        
+
         if(firstTime){
             firstTime = false;
             if (PigMapReduce.sJobConfInternal.get() != null) {
@@ -229,17 +230,17 @@ public class POPackage extends PhysicalO
                 }
             }
         }
-        
+
         if(distinct) {
             // only set the key which has the whole
-            // tuple 
+            // tuple
             res = mTupleFactory.newTuple(1);
             res.set(0, key);
         } else {
             //Create numInputs bags
             DataBag[] dbs = null;
             dbs = new DataBag[numInputs];
-                 
+
             if (isAccumulative()) {
                 // create bag wrapper to pull tuples in many batches
                 // all bags have reference to the sample tuples buffer
@@ -248,30 +249,30 @@ public class POPackage extends PhysicalO
                 for (int i = 0; i < numInputs; i++) {
                     dbs[i] = new AccumulativeBag(buffer, i);
                 }
-                
+
             } else {
                 // create bag to pull all tuples out of iterator
                 for (int i = 0; i < numInputs; i++) {
                     dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
-                    // In a very rare case if there is a POStream after this 
+                    // In a very rare case if there is a POStream after this
                     // POPackage in the pipeline and is also blocking the pipeline;
                     // constructor argument should be 2 * numInputs. But for one obscure
-                    // case we don't want to pay the penalty all the time.                
-                            : new InternalCachedBag(numInputs);                    
-                }                               
+                    // case we don't want to pay the penalty all the time.
+                            : new InternalCachedBag(numInputs);
+                }
                 //For each indexed tup in the inp, sort them
                 //into their corresponding bags based
                 //on the index
                 while (tupIter.hasNext()) {
                     NullableTuple ntup = tupIter.next();
                     int index = ntup.getIndex();
-                    Tuple copy = getValueTuple(ntup, index);  
-                    
+                    Tuple copy = getValueTuple(ntup, index);
+
                     if (numInputs == 1) {
-                        
-                        // this is for multi-query merge where 
+
+                        // this is for multi-query merge where
                         // the numInputs is always 1, but the index
-                        // (the position of the inner plan in the 
+                        // (the position of the inner plan in the
                         // enclosed operator) may not be 1.
                         dbs[0].add(copy);
                     } else {
@@ -280,7 +281,7 @@ public class POPackage extends PhysicalO
                     if(reporter!=null) reporter.progress();
                 }
             }
-                      
+
             //Construct the output tuple by appending
             //the key and all the above constructed bags
             //and return it.
@@ -296,8 +297,8 @@ public class POPackage extends PhysicalO
                         r.returnStatus = POStatus.STATUS_NULL;
                         return r;
                     }
-                } 
-                
+                }
+
                 res.set(i+1,bag);
             }
         }
@@ -315,26 +316,26 @@ public class POPackage extends PhysicalO
      // Need to make a copy of the value, as hadoop uses the same ntup
         // to represent each value.
         Tuple val = (Tuple)ntup.getValueAsPigType();
-        
+
         Tuple copy = null;
         // The "value (val)" that we just got may not
         // be the complete "value". It may have some portions
         // in the "key" (look in POLocalRearrange for more comments)
         // If this is the case we need to stitch
-        // the "value" together.        
-        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = 
-            keyInfo.get(index);           
+        // the "value" together.
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+            keyInfo.get(index);
         boolean isProjectStar = lrKeyInfo.first;
         Map<Integer, Integer> keyLookup = lrKeyInfo.second;
         int keyLookupSize = keyLookup.size();
 
         if( keyLookupSize > 0) {
-        
+
             // we have some fields of the "value" in the
             // "key".
             int finalValueSize = keyLookupSize + val.size();
             copy = mTupleFactory.newTuple(finalValueSize);
-            int valIndex = 0; // an index for accessing elements from 
+            int valIndex = 0; // an index for accessing elements from
                               // the value (val) that we have currently
             for(int i = 0; i < finalValueSize; i++) {
                 Integer keyIndex = keyLookup.get(i);
@@ -357,12 +358,12 @@ public class POPackage extends PhysicalO
             }
             copy = illustratorMarkup2(val, copy);
         } else if (isProjectStar) {
-            
+
             // the whole "value" is present in the "key"
             copy = mTupleFactory.newTuple(keyAsTuple.getAll());
             copy = illustratorMarkup2(keyAsTuple, copy);
         } else {
-            
+
             // there is no field of the "value" in the
             // "key" - so just make a copy of what we got
             // as the "value"
@@ -371,7 +372,7 @@ public class POPackage extends PhysicalO
         }
         return copy;
     }
-    
+
     public byte getKeyType() {
         return keyType;
     }
@@ -379,9 +380,9 @@ public class POPackage extends PhysicalO
     public void setKeyType(byte keyType) {
         this.keyType = keyType;
     }
-  
+
     /**
-     * Make a deep copy of this operator.  
+     * Make a deep copy of this operator.
      * @throws CloneNotSupportedException
      */
     @Override
@@ -417,7 +418,7 @@ public class POPackage extends PhysicalO
     public void setKeyTuple(boolean keyTuple) {
         this.isKeyTuple = keyTuple;
     }
-    
+
     /**
      * @param keyCompound the keyCompound to set
      */
@@ -445,7 +446,7 @@ public class POPackage extends PhysicalO
     public void setDistinct(boolean distinct) {
         this.distinct = distinct;
     }
-    
+
     public void setUseSecondaryKey(boolean useSecondaryKey) {
         this.useSecondaryKey = useSecondaryKey;
     }
@@ -453,11 +454,11 @@ public class POPackage extends PhysicalO
     public void setPackageType(PackageType type) {
         this.pkgType = type;
     }
-    
+
     public PackageType getPackageType() {
         return pkgType;
     }
-    
+
     class POPackageTupleBuffer implements AccumulativeTupleBuffer {
         private List<Tuple>[] bags;
         private Iterator<NullableTuple> iter;
@@ -465,15 +466,15 @@ public class POPackage extends PhysicalO
         private Object currKey;
 
         @SuppressWarnings("unchecked")
-        public POPackageTupleBuffer() {    		
+        public POPackageTupleBuffer() {
             batchSize = 20000;
             if (PigMapReduce.sJobConfInternal.get() != null) {
                 String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize");
                 if (size != null) {
                     batchSize = Integer.parseInt(size);
                 }
-            }		
-            
+            }
+
             this.bags = new List[numInputs];
             for(int i=0; i<numInputs; i++) {
                 this.bags[i] = new ArrayList<Tuple>();
@@ -481,7 +482,7 @@ public class POPackage extends PhysicalO
             this.iter = tupIter;
             this.currKey = key;
         }
-        
+
         @Override
         public boolean hasNextBatch() {
             return iter.hasNext();
@@ -492,18 +493,18 @@ public class POPackage extends PhysicalO
             for(int i=0; i<bags.length; i++) {
                 bags[i].clear();
             }
-                        
-            key = currKey;			
+
+            key = currKey;
             for(int i=0; i<batchSize; i++) {
                 if (iter.hasNext()) {
                      NullableTuple ntup = iter.next();
                      int index = ntup.getIndex();
-                     Tuple copy = getValueTuple(ntup, index);		            
+                     Tuple copy = getValueTuple(ntup, index);
                      if (numInputs == 1) {
-                            
-                            // this is for multi-query merge where 
+
+                            // this is for multi-query merge where
                             // the numInputs is always 1, but the index
-                            // (the position of the inner plan in the 
+                            // (the position of the inner plan in the
                             // enclosed operator) may not be 1.
                             bags[0].add(copy);
                      } else {
@@ -513,24 +514,24 @@ public class POPackage extends PhysicalO
                     break;
                 }
             }
-        } 
-        
+        }
+
         public void clear() {
             for(int i=0; i<bags.length; i++) {
                 bags[i].clear();
             }
             iter = null;
         }
-        
-        public Iterator<Tuple> getTuples(int index) {			
+
+        public Iterator<Tuple> getTuples(int index) {
             return bags[index].iterator();
         }
-        
+
         public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
             return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
         }
        };
-       
+
        private Tuple illustratorMarkup2(Object in, Object out) {
            if(illustrator != null) {
                ExampleTuple tOut = new ExampleTuple((Tuple) out);
@@ -541,7 +542,7 @@ public class POPackage extends PhysicalO
            } else
                return (Tuple) out;
        }
-       
+
        @Override
        public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
            if(illustrator != null) {
@@ -600,4 +601,5 @@ public class POPackage extends PhysicalO
            } else
                return (Tuple) out;
        }
+
 }

Modified: pig/trunk/src/org/apache/pig/impl/plan/Operator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/Operator.java?rev=1418553&r1=1418552&r2=1418553&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/Operator.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/Operator.java Fri Dec  7 23:18:46 2012
@@ -82,11 +82,7 @@ abstract public class Operator<V extends
 
     @Override
     public String toString() {
-        StringBuilder msg = new StringBuilder();
-
-        msg.append("(Name: " + name() + " Operator Key: " + mKey + ")");
-
-        return msg.toString();
+        return "(Name: " + name() + " Operator Key: " + mKey + ")";
     }
     
     /**