You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/26 19:05:06 UTC

svn commit: r829883 [2/2] - in /hadoop/pig/branches/load-store-redesign: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executioneng...

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java Mon Oct 26 18:05:05 2009
@@ -17,31 +17,9 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.impl.util.ObjectSerializer;
 
 /**
  * This class is the static Mapper class used by Pig
@@ -62,12 +40,12 @@
 
 public class PigMapOnly {
 
-    public static class Map extends PigMapBase implements
-            Mapper<Text, Tuple, PigNullableWritable, Writable> {
+    public static class Map extends PigMapBase {
 
         @Override
-        public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException {
-            oc.collect(null, tuple);
+        public void collect(Context oc, Tuple tuple) 
+                throws InterruptedException, IOException {            
+            oc.write(null, tuple);
         }
     }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Mon Oct 26 18:05:05 2009
@@ -25,42 +25,34 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.log4j.PropertyConfigurator;
-
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.WrappedIOException;
-
-import org.apache.pig.data.DataBag;
-import org.apache.pig.impl.io.NullablePartitionWritable;
 
 /**
  * This class is the static Mapper &amp; Reducer classes that
@@ -87,25 +79,29 @@
  */
 public class PigMapReduce {
 
-    public static JobConf sJobConf = null;
+    public static JobContext sJobContext = null;
+    public static Configuration sJobConf = null;
     private final static Tuple DUMMYTUPLE = null;
     
-    public static class Map extends PigMapBase implements
-            Mapper<Text, Tuple, PigNullableWritable, Writable> {
+    public static class Map extends PigMapBase {
 
         @Override
-        public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException {
+        public void collect(Context oc, Tuple tuple) 
+                throws InterruptedException, IOException {
+            
             Byte index = (Byte)tuple.get(0);
             PigNullableWritable key =
                 HDataType.getWritableComparableTypes(tuple.get(1), keyType);
             NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+            
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
             // assign the tuple to its slot in the projection.
             key.setIndex(index);
-            val.setIndex(index);
-            oc.collect(key, val);
+            val.setIndex(index);         	
+            	
+            oc.write(key, val);
         }
     }
     
@@ -115,12 +111,12 @@
      * to be handed tuples. Hence this map class ensures that the "key" used
      * in the order by is wrapped into a tuple (if it isn't already a tuple)
      */
-    public static class MapWithComparator extends PigMapBase implements
-            Mapper<Text, Tuple, PigNullableWritable, Writable> {
+    public static class MapWithComparator extends PigMapBase {
 
         @Override
-        public void collect(OutputCollector<PigNullableWritable, Writable> oc,
-                Tuple tuple) throws ExecException, IOException {
+        public void collect(Context oc, Tuple tuple) 
+                throws InterruptedException, IOException {
+            
             Object keyTuple = null;
             if(keyType != DataType.TUPLE) {
                 Object k = tuple.get(1);
@@ -134,110 +130,115 @@
             PigNullableWritable key =
                 HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
             NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+            
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
             // assign the tuple to its slot in the projection.
             key.setIndex(index);
             val.setIndex(index);
-            oc.collect(key, val);
+
+            oc.write(key, val);
         }
     }
 
-	/**
-	 * Used by Skewed Join
-	 */
-    public static class MapWithPartitionIndex extends Map implements
-            Mapper<Text, Tuple, PigNullableWritable, Writable> {    	
+    /**
+     * Used by Skewed Join
+     */
+    public static class MapWithPartitionIndex extends Map {
 
         @Override
-        public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException {			
-			Byte tupleKeyIdx = 2;
-			Byte tupleValIdx = 3;
+        public void collect(Context oc, Tuple tuple) 
+                throws InterruptedException, IOException {
+            
+            Byte tupleKeyIdx = 2;
+            Byte tupleValIdx = 3;
 
             Byte index = (Byte)tuple.get(0);
-			Byte partitionIndex = -1;
-        	// for partitioning table, the partition index isn't present
-			if (tuple.size() == 3) {
-				//super.collect(oc, tuple);
-				//return;
-				tupleKeyIdx--;
-				tupleValIdx--;
-			} else {
-				partitionIndex = (Byte)tuple.get(1);
-			}
+            Byte partitionIndex = -1;
+            // for partitioning table, the partition index isn't present
+            if (tuple.size() == 3) {
+                //super.collect(oc, tuple);
+                //return;
+                tupleKeyIdx--;
+                tupleValIdx--;
+            } else {
+                partitionIndex = (Byte)tuple.get(1);
+            }
 
             PigNullableWritable key =
                 HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), DataType.TUPLE);
 
-			NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
-			//key.setIndex(index);
-			//NullableTuple wrappedKey = new NullableTuple((Tuple)tuple);
+            NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
 
-			NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
+            NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
+            
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
             // assign the tuple to its slot in the projection.
             wrappedKey.setIndex(index);
-			// set the partition
-			wrappedKey.setPartition(partitionIndex);
-			val.setIndex(index);
-            oc.collect(wrappedKey, val);
-            //oc.collect(key, val);
-        }
-
-		@Override
-		protected void runPipeline(PhysicalOperator leaf) throws IOException, ExecException {			
-			while(true){								
-				Result res = leaf.getNext(DUMMYTUPLE);
-				
-				if(res.returnStatus==POStatus.STATUS_OK){
-					// For POPartitionRearrange, the result is a bag. This operator is used for 
-					// skewed join
-					if (res.result instanceof DataBag) {					
-						Iterator<Tuple> its = ((DataBag)res.result).iterator();
-						while(its.hasNext()) {																
-							collect(outputCollector, its.next());
-						}
-					}else{											
-						collect(outputCollector, (Tuple)res.result);
-					}
-					continue;
-				}
-				
-				if(res.returnStatus==POStatus.STATUS_EOP) {
-					return;
-				}
-				
-				if(res.returnStatus==POStatus.STATUS_NULL)
-					continue;
-				
-				if(res.returnStatus==POStatus.STATUS_ERR){
-					// remember that we had an issue so that in 
-					// close() we can do the right thing
-					errorInMap  = true;
-					// if there is an errmessage use it
-					String errMsg;
-					if(res.result != null) {
-						errMsg = "Received Error while " +
-						"processing the map plan: " + res.result;
-					} else {
-						errMsg = "Received Error while " +
-						"processing the map plan.";
-					}
-						
-					int errCode = 2055;
-					ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
-					throw ee;
-				}
-			}
-		}
-	}
-
-    public static class Reduce extends MapReduceBase
-            implements
-            Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+            
+            // set the partition
+            wrappedKey.setPartition(partitionIndex);
+            val.setIndex(index);
+
+            oc.write(wrappedKey, val);
+        }
+
+        @Override
+        protected void runPipeline(PhysicalOperator leaf) 
+                throws IOException, InterruptedException {
+            
+            while(true){
+                Result res = leaf.getNext(DUMMYTUPLE);
+                
+                if(res.returnStatus==POStatus.STATUS_OK){
+                    // For POPartitionRearrange, the result is a bag. 
+                    // This operator is used for skewed join
+                    if (res.result instanceof DataBag) {
+                        Iterator<Tuple> its = ((DataBag)res.result).iterator();
+                        while(its.hasNext()) {
+                            collect(outputCollector, its.next());
+                        }
+                    }else{
+                        collect(outputCollector, (Tuple)res.result);
+                    }
+                    continue;
+                }
+                
+                if(res.returnStatus==POStatus.STATUS_EOP) {
+                    return;
+                }
+
+                if(res.returnStatus==POStatus.STATUS_NULL) {
+                    continue;
+                }
+
+                if(res.returnStatus==POStatus.STATUS_ERR){
+                    // remember that we had an issue so that in 
+                    // close() we can do the right thing
+                    errorInMap  = true;
+                    // if there is an errmessage use it
+                    String errMsg;
+                    if(res.result != null) {
+                        errMsg = "Received Error while " +
+                            "processing the map plan: " + res.result;
+                    } else {
+                        errMsg = "Received Error while " +
+                            "processing the map plan.";
+                    }
+
+                    int errCode = 2055;
+                    throw new ExecException(errMsg, errCode, PigException.BUG);
+                }
+            }
+        }
+    }
+
+    public static class Reduce 
+            extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+        
         protected final Log log = LogFactory.getLog(getClass());
         
         //The reduce plan
@@ -255,7 +256,7 @@
         
         ProgressableReporter pigReporter;
 
-        protected OutputCollector<PigNullableWritable, Writable> outputCollector;
+        protected Context outputCollector;
 
         protected boolean errorInReduce = false;
         
@@ -272,10 +273,13 @@
          */
         @SuppressWarnings("unchecked")
         @Override
-        public void configure(JobConf jConf) {
-            super.configure(jConf);
+        protected void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
+            
+            Configuration jConf = context.getConfiguration();
             SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
-            sJobConf = jConf;
+            sJobContext = context;
+            sJobConf = context.getConfiguration();
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
                 pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
@@ -297,8 +301,6 @@
                 }
                 // till here
                 
-                long sleepTime = jConf.getLong("pig.reporter.sleep.time", 10000);
-
                 pigReporter = new ProgressableReporter();
                 if(!(rp.isEmpty())) {
                     roots = rp.getRoots().toArray(new PhysicalOperator[1]);
@@ -315,37 +317,36 @@
          * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
          * The package result is either collected as is, if the reduce plan is
          * empty or after passing through the reduce plan.
-         */
-        public void reduce(PigNullableWritable key,
-                Iterator<NullableTuple> tupIter,
-                OutputCollector<PigNullableWritable, Writable> oc,
-                Reporter reporter) throws IOException {
+         */       
+        @Override
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
+                throws IOException, InterruptedException {            
             
             if (!initialized) {
                 initialized = true;
                 
                 // cache the collector for use in runPipeline()
                 // which could additionally be called from close()
-                this.outputCollector = oc;
-                pigReporter.setRep(reporter);
+                this.outputCollector = context;
+                pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-	        
+
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setReporter(reporter);
+                pigHadoopLogger.setReporter(context);
+                
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
 
                 for (POStore store: stores) {
                     MapReducePOStoreImpl impl 
-                        = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
-                    impl.setReporter(reporter);
+                        = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
                     store.setUp();
                 }
             }
-
+          
             // In the case we optimize the join, we combine
             // POPackage and POForeach - so we could get many
             // tuples out of the getnext() call of POJoinPackage
@@ -353,72 +354,76 @@
             // POJoinPacakage.getNext()
             if (pack instanceof POJoinPackage)
             {
-                pack.attachInput(key, tupIter);
+                pack.attachInput(key, tupIter.iterator());
                 while (true)
                 {
-                    if (processOnePackageOutput(oc))
+                    if (processOnePackageOutput(context))
                         break;
                 }
             }
             else {
                 // join is not optimized, so package will
                 // give only one tuple out for the key
-                pack.attachInput(key, tupIter);
-                processOnePackageOutput(oc);
-            }
+                pack.attachInput(key, tupIter.iterator());
+                processOnePackageOutput(context);
+            } 
         }
         
         // return: false-more output
         //         true- end of processing
-        public boolean processOnePackageOutput(OutputCollector<PigNullableWritable, Writable> oc) throws IOException
-        {
-            try {
-                Result res = pack.getNext(DUMMYTUPLE);
-                if(res.returnStatus==POStatus.STATUS_OK){
-                    Tuple packRes = (Tuple)res.result;
-                    
-                    if(rp.isEmpty()){
-                        oc.collect(null, packRes);
-                        return false;
-                    }
-                    for (int i = 0; i < roots.length; i++) {
-                        roots[i].attachInput(packRes);
-                    }
-                    runPipeline(leaf);
-                    
-                }
+        public boolean processOnePackageOutput(Context oc) 
+                throws IOException, InterruptedException {
+
+            Result res = pack.getNext(DUMMYTUPLE);
+            if(res.returnStatus==POStatus.STATUS_OK){
+                Tuple packRes = (Tuple)res.result;
                 
-                if(res.returnStatus==POStatus.STATUS_NULL) {
+                if(rp.isEmpty()){
+                    oc.write(null, packRes);
                     return false;
                 }
-                
-                if(res.returnStatus==POStatus.STATUS_ERR){
-                    int errCode = 2093;
-                    String msg = "Encountered error in package operator while processing group.";
-                    throw new ExecException(msg, errCode, PigException.BUG);
+                for (int i = 0; i < roots.length; i++) {
+                    roots[i].attachInput(packRes);
                 }
+                runPipeline(leaf);
                 
-                if(res.returnStatus==POStatus.STATUS_EOP) {
-                    return true;
-                }
-                    
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_NULL) {
                 return false;
-            } catch (ExecException e) {
-                throw e;
             }
+            
+            if(res.returnStatus==POStatus.STATUS_ERR){
+                int errCode = 2093;
+                String msg = "Encountered error in package operator while processing group.";
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_EOP) {
+                return true;
+            }
+                
+            return false;
+            
         }
         
         /**
          * @param leaf
-         * @throws ExecException 
+         * @throws InterruptedException
          * @throws IOException 
          */
-        protected void runPipeline(PhysicalOperator leaf) throws ExecException, IOException {
+        protected void runPipeline(PhysicalOperator leaf) 
+                throws InterruptedException, IOException {
+            
             while(true)
             {
                 Result redRes = leaf.getNext(DUMMYTUPLE);
                 if(redRes.returnStatus==POStatus.STATUS_OK){
-                   	outputCollector.collect(null, (Tuple)redRes.result);
+                    try{
+                        outputCollector.write(null, (Tuple)redRes.result);
+                    }catch(Exception e) {
+                        throw new IOException(e);
+                    }
                     continue;
                 }
                 
@@ -426,8 +431,9 @@
                     return;
                 }
                 
-                if(redRes.returnStatus==POStatus.STATUS_NULL)
+                if(redRes.returnStatus==POStatus.STATUS_NULL) {
                     continue;
+                }
                 
                 if(redRes.returnStatus==POStatus.STATUS_ERR){
                     // remember that we had an issue so that in 
@@ -446,17 +452,15 @@
                     throw new ExecException(msg, errCode, PigException.BUG);
                 }
             }
-
-        
         }
         
         /**
          * Will be called once all the intermediate keys and values are
          * processed. So right place to stop the reporter thread.
          */
-        @Override
-        public void close() throws IOException {
-            super.close();
+        @Override 
+        protected void cleanup(Context context) throws IOException, InterruptedException {
+            super.cleanup(context);
             
             if(errorInReduce) {
                 // there was an error in reduce - just return
@@ -471,17 +475,13 @@
                 // This will result in nothing happening in the case
                 // where there is no stream in the pipeline
                 rp.endOfAllInput = true;
-                try {
-                    runPipeline(leaf);
-                } catch (ExecException e) {
-                     throw e;
-                }
+                runPipeline(leaf);
             }
 
             for (POStore store: stores) {
                 if (!initialized) {
                     MapReducePOStoreImpl impl 
-                        = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+                        = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
                     store.setUp();
                 }
@@ -519,8 +519,8 @@
          * and the reporter thread
          */
         @Override
-        public void configure(JobConf jConf) {
-            super.configure(jConf);
+        protected void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
             keyType = pack.getKeyType();
         }
 
@@ -530,31 +530,29 @@
          * The package result is either collected as is, if the reduce plan is
          * empty or after passing through the reduce plan.
          */
-        public void reduce(PigNullableWritable key,
-                Iterator<NullableTuple> tupIter,
-                OutputCollector<PigNullableWritable, Writable> oc,
-                Reporter reporter) throws IOException {
+        @Override
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
+                throws IOException, InterruptedException {
             
             if (!initialized) {
                 initialized = true;
                 
                 // cache the collector for use in runPipeline()
                 // which could additionally be called from close()
-                this.outputCollector = oc;
-                pigReporter.setRep(reporter);
+                this.outputCollector = context;
+                pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
                 
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setReporter(reporter);
+                pigHadoopLogger.setReporter(context);
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
                 
                 for (POStore store: stores) {
                     MapReducePOStoreImpl impl 
-                        = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
-                    impl.setReporter(reporter);
+                        = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
                     store.setUp();
                 }
@@ -574,43 +572,38 @@
                 }
             }
             
-            pack.attachInput(key, tupIter);
+            pack.attachInput(key, tupIter.iterator());
             
-            try {
-                Result res = pack.getNext(DUMMYTUPLE);
-                if(res.returnStatus==POStatus.STATUS_OK){
-                    Tuple packRes = (Tuple)res.result;
-                    
-                    if(rp.isEmpty()){
-                        oc.collect(null, packRes);
-                        return;
-                    }
-                    
-                    rp.attachInput(packRes);
-
-                    List<PhysicalOperator> leaves = rp.getLeaves();
-                    
-                    PhysicalOperator leaf = leaves.get(0);
-                    runPipeline(leaf);
-                    
-                }
+            Result res = pack.getNext(DUMMYTUPLE);
+            if(res.returnStatus==POStatus.STATUS_OK){
+                Tuple packRes = (Tuple)res.result;
                 
-                if(res.returnStatus==POStatus.STATUS_NULL) {
+                if(rp.isEmpty()){
+                    context.write(null, packRes);
                     return;
                 }
                 
-                if(res.returnStatus==POStatus.STATUS_ERR){
-                    int errCode = 2093;
-                    String msg = "Encountered error in package operator while processing group.";
-                    throw new ExecException(msg, errCode, PigException.BUG);
-                }
-                    
+                rp.attachInput(packRes);
+
+                List<PhysicalOperator> leaves = rp.getLeaves();
+                
+                PhysicalOperator leaf = leaves.get(0);
+                runPipeline(leaf);
                 
-            } catch (ExecException e) {
-                throw e;
             }
+            
+            if(res.returnStatus==POStatus.STATUS_NULL) {
+                return;
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_ERR){
+                int errCode = 2093;
+                String msg = "Encountered error in package operator while processing group.";
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+
         }
 
     }
-    
+   
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Mon Oct 26 18:05:05 2009
@@ -19,16 +19,23 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.text.NumberFormat;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Progressable;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFunc;
@@ -37,6 +44,7 @@
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.tools.bzip2r.CBZip2OutputStream;
 
 /**
@@ -45,16 +53,31 @@
  * image of PigInputFormat having RecordWriter instead
  * of a RecordReader.
  */
-public class PigOutputFormat implements OutputFormat<WritableComparable, Tuple> {
-    public static final String PIG_OUTPUT_FUNC = "pig.output.func";
+@SuppressWarnings("unchecked")
+public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> {
 
+    /** hadoop job output directory */
+    public static final String MAPRED_OUTPUT_DIR = "mapred.output.dir";
+    /** hadoop partition number */ 
+    public static final String MAPRED_TASK_PARTITION = "mapred.task.partition";
+    
+    /** the temporary directory for the multi store */
+    public static final String PIG_MAPRED_OUTPUT_DIR = "pig.mapred.output.dir";
+    /** the relative path that can be used to build a temporary
+     * place to store the output from a number of map-reduce tasks*/
+    public static final String PIG_TMP_PATH =  "pig.tmp.path";
+    
+    private FileOutputCommitter committer = null;
+    
+    private final Log log = LogFactory.getLog(getClass());
+    
     /**
      * In general, the mechanism for an OutputFormat in Pig to get hold of the storeFunc
      * and the metadata information (for now schema and location provided for the store in
      * the pig script) is through the following Utility static methods:
-     * {@link org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil#getStoreFunc(JobConf)} 
+     * {@link org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil#getStoreFunc(Configuration)} 
      * - this will get the {@link org.apache.pig.StoreFunc} reference to use in the RecordWriter.write()
-     * {@link MapRedUtil#getStoreConfig(JobConf)} - this will get the {@link org.apache.pig.StoreConfig}
+     * {@link MapRedUtil#getStoreConfig(Configuration)} - this will get the {@link org.apache.pig.StoreConfig}
      * reference which has metadata like the location (the string supplied with store statement in the script)
      * and the {@link org.apache.pig.impl.logicalLayer.schema.Schema} of the data. The OutputFormat
      * should NOT use the location in the StoreConfig to write the output if the location represents a 
@@ -66,18 +89,12 @@
      * which will provide a safe output directory into which the OutputFormat should write
      * the part file (given by the name argument in the getRecordWriter() call).
      */
-    public RecordWriter<WritableComparable, Tuple> getRecordWriter(FileSystem fs, JobConf job,
-            String name, Progressable progress) throws IOException {
-        Path outputDir = FileOutputFormat.getWorkOutputPath(job);
-        return getRecordWriter(fs, job, outputDir, name, progress);
-    }
-
-    public PigRecordWriter getRecordWriter(FileSystem fs, JobConf job,
-            Path outputDir, String name, Progressable progress)
-            throws IOException {
-        StoreFunc store = MapRedUtil.getStoreFunc(job);
-
-        String parentName = FileOutputFormat.getOutputPath(job).getName();
+    private PigRecordWriter getRecordWriter(FileSystem fs, TaskAttemptContext context,
+                    Path outputDir, String name) throws IOException {
+        
+        StoreFunc store = MapRedUtil.getStoreFunc(context.getConfiguration());
+        
+        String parentName = FileOutputFormat.getOutputPath(context).getName();
         int suffixStart = parentName.lastIndexOf('.');
         if (suffixStart != -1) {
             String suffix = parentName.substring(suffixStart);
@@ -85,22 +102,20 @@
                 name = name + suffix;
             }
         }
-        return new PigRecordWriter(fs, new Path(outputDir, name), store);
-    }
 
-    public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
-        // TODO We really should validate things here
-        return;
+        return new PigRecordWriter(fs, new Path(outputDir, name), store);
     }
 
-    static public class PigRecordWriter implements
-            RecordWriter<WritableComparable, Tuple> {
+    @SuppressWarnings("unchecked")
+    static public class PigRecordWriter
+            extends RecordWriter<WritableComparable, Tuple> {
+        
         private OutputStream os = null;
 
         private StoreFunc sfunc = null;
 
         public PigRecordWriter(FileSystem fs, Path file, StoreFunc sfunc)
-                throws IOException {
+                throws IOException {            
             this.sfunc = sfunc;
             fs.delete(file, true);
             this.os = fs.create(file);
@@ -115,17 +130,66 @@
          * We only care about the values, so we are going to skip the keys when
          * we write.
          * 
-         * @see org.apache.hadoop.mapred.RecordWriter#write(Object, Object)
+         * @see org.apache.hadoop.mapreduce.RecordWriter#write(Object, Object)
          */
+        @Override
         public void write(WritableComparable key, Tuple value)
-                throws IOException {
+                throws IOException, InterruptedException{
             this.sfunc.putNext(value);
         }
 
-        public void close(Reporter reporter) throws IOException {
+        @Override
+        public void close(TaskAttemptContext context) 
+                throws IOException, InterruptedException {
             sfunc.finish();
-            os.close();
+            os.close();            
         }
 
     }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) 
+            throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        if (committer == null) {
+            Path output = null;            
+            if (context.getConfiguration().get(PIG_MAPRED_OUTPUT_DIR) != null) {
+                output = new Path(context.getConfiguration().get(PIG_MAPRED_OUTPUT_DIR));
+            } else {
+                output = new Path(context.getConfiguration().get(MAPRED_OUTPUT_DIR));
+            }            
+            committer = new FileOutputCommitter(output, context);
+        }
+        return committer;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public RecordWriter getRecordWriter(
+            TaskAttemptContext context) throws IOException, InterruptedException {
+        FileOutputCommitter committer = (FileOutputCommitter)getOutputCommitter(context);
+        Path outputDir = committer.getWorkPath();
+        Configuration conf = context.getConfiguration();
+        String tmpPath = conf.get(PIG_TMP_PATH);
+        if (tmpPath != null) {
+            outputDir = new Path(committer.getWorkPath(), tmpPath);
+        } 
+        return getRecordWriter(FileSystem.get(conf), context, outputDir, getPartName(conf));
+    }
+    
+    private String getPartName(Configuration conf) {
+        int partition = conf.getInt(MAPRED_TASK_PARTITION, -1);   
+
+        NumberFormat numberFormat = NumberFormat.getInstance();
+        numberFormat.setMinimumIntegerDigits(5);
+        numberFormat.setGroupingUsed(false);
+
+        return "part-" + numberFormat.format(partition);
+    }
+
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java Mon Oct 26 18:05:05 2009
@@ -17,17 +17,17 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
 
 public class ProgressableReporter implements PigProgressable {
-    Reporter rep;
-    
+    Progressable rep;
+
     public ProgressableReporter(){
         
     }
-    
-    public ProgressableReporter(Reporter rep) {
+
+    public ProgressableReporter(Progressable rep) {
         super();
         this.rep = rep;
     }
@@ -38,10 +38,10 @@
     }
 
     public void progress(String msg) {
-        rep.setStatus(msg);
+        
     }
 
-    public void setRep(Reporter rep) {
+    public void setRep(Progressable rep) {
         this.rep = rep;
     }
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java Mon Oct 26 18:05:05 2009
@@ -28,17 +28,19 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Set;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.Slice;
@@ -47,7 +49,6 @@
 import org.apache.pig.backend.executionengine.PigSlice;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -57,14 +58,20 @@
 /**
  * Wraps a {@link Slice} in an {@link InputSplit} so it's usable by hadoop.
  */
-public class SliceWrapper implements InputSplit {
+public class SliceWrapper extends InputSplit implements Writable {
 
     private int index;
     private ExecType execType;
     private Slice wrapped;
     private transient FileSystem fs;// transient so it isn't serialized
-    private transient JobConf lastConf;
+    private transient Configuration lastConf;
     private ArrayList<OperatorKey> targetOps;
+    
+    // XXX hadoop 20 new API integration: get around a hadoop 20 bug 
+    // by setting total number of splits to each split so that it can
+    // be passed to the back-end. This value is needed 
+    // by PoissonSampleLoader to compute the number of samples
+    private int totalSplits;
 
     public SliceWrapper() {
         // for deserialization
@@ -77,22 +84,32 @@
         this.fs = fs;
         this.targetOps = targetOps;
     }
-
+        
     public int getIndex() {
         return index;
     }
 
+    public void setTotalSplits(int t) {
+        totalSplits = t;
+    }
+    
+    public int getTotalSplits() {
+        return totalSplits;        
+    }
+    
+    @Override
     public long getLength() throws IOException {
         return wrapped.getLength();
     }
 
+    @Override
     public String[] getLocations() throws IOException {
         if(wrapped instanceof PigSlice) {
             Set<String> locations = new HashSet<String>();
             for (String loc : wrapped.getLocations()) {
                 Path path = new Path(loc);
                 FileStatus status = fs.getFileStatus(path); 
-				BlockLocation[] b = fs.getFileBlockLocations(status, wrapped.getStart(), wrapped.getLength());
+                BlockLocation[] b = fs.getFileBlockLocations(status, wrapped.getStart(), wrapped.getLength());
                 int total = 0;
                 for (int i = 0; i < b.length; i++) {
                     total += b[i].getHosts().length;
@@ -108,76 +125,84 @@
             return locations.toArray(new String[locations.size()]);
         } else {
             return wrapped.getLocations();
-        }
-        
-        
+        }       
     }
 
-    public JobConf getJobConf() {
+    public Configuration getJobConf() {
         return lastConf;
     }
 
+
     @SuppressWarnings("unchecked")
-    public RecordReader<Text, Tuple> makeReader(JobConf job) throws IOException {
-        lastConf = job;        
-        DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
+    public RecordReader<Text, Tuple> makeReader(Configuration conf) throws IOException {
+        lastConf = conf;        
+        DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(conf));
+
         // if the execution is against Mapred DFS, set
         // working dir to /user/<userid>
         if(execType == ExecType.MAPREDUCE)
-            store.setActiveContainer(store.asContainer("/user/" + job.getUser()));
-        PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
-        wrapped.init(store);
+            store.setActiveContainer(store.asContainer("/user/" + conf.get("user.name")));
+        PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
+        wrapped.init(store);                             
         
-        job.set("map.target.ops", ObjectSerializer.serialize(targetOps));
         // Mimic org.apache.hadoop.mapred.FileSplit if feasible...
         String[] locations = wrapped.getLocations();
         if (locations.length > 0) {
-            job.set("map.input.file", locations[0]);    
-            job.setLong("map.input.start", wrapped.getStart());   
-            job.setLong("map.input.length", wrapped.getLength());
+            conf.set("map.input.file", locations[0]);    
+            conf.setLong("map.input.start", wrapped.getStart());   
+            conf.setLong("map.input.length", wrapped.getLength());
         }
         
-        return new RecordReader<Text, Tuple>() {
-
-            TupleFactory tupFac = TupleFactory.getInstance();
-            public void close() throws IOException {
-                wrapped.close();
-            }
+        return new TupleReader();
+    }
+    
+    public class TupleReader extends RecordReader<Text, Tuple> {
+        private Tuple current;
 
-            public Text createKey() {
-                return null; // we never use the key!
-            }
+        TupleFactory tupFac = TupleFactory.getInstance();
+        
+        @Override
+        public void close() throws IOException {
+            wrapped.close();
+        } 
+
+        @Override
+        public float getProgress() throws IOException {
+            return wrapped.getProgress();
+        }
 
-            public Tuple createValue() {
-                return tupFac.newTuple();
-            }
+        @Override
+        public Text getCurrentKey() throws IOException, InterruptedException {
+            return null;
+        }
 
-            public long getPos() throws IOException {
-                return wrapped.getPos();
-            }
+        @Override
+        public Tuple getCurrentValue() throws IOException, InterruptedException {
+            return current;
+        }
 
-            public float getProgress() throws IOException {
-                return wrapped.getProgress();
-            }
+        @Override
+        public void initialize(InputSplit inputsplit, 
+            TaskAttemptContext taskattemptcontext) throws IOException, InterruptedException {
+        }
 
-            public boolean next(Text key, Tuple value) throws IOException {
-                return wrapped.next(value);
-            }
-        };
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            Tuple t = tupFac.newTuple();
+            boolean result = wrapped.next(t);
+            current = t;
+            return result;
+        }
     }
 
     @SuppressWarnings("unchecked")
+    @Override
     public void readFields(DataInput is) throws IOException {
         execType = (ExecType) readObject(is);
         targetOps = (ArrayList<OperatorKey>) readObject(is);
         index = is.readInt();
         wrapped = (Slice) readObject(is);
-    }
-
-    private IOException wrapException(Exception e) {
-        IOException newE = new IOException(e.getMessage());
-        newE.initCause(e);
-        return newE;
+        totalSplits = is.readInt();
     }
 
     private Object readObject(DataInput is) throws IOException {
@@ -194,11 +219,13 @@
         }
     }
 
+    @Override
     public void write(DataOutput os) throws IOException {
         writeObject(execType, os);
         writeObject(targetOps, os);
         os.writeInt(index);
         writeObject(wrapped, os);
+        os.writeInt(totalSplits);
     }
 
     private void writeObject(Serializable obj, DataOutput os)
@@ -218,4 +245,7 @@
         return wrapped;
     }
 
+    public List<OperatorKey> getTargetOperatorKeyList() {
+        return targetOps;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Mon Oct 26 18:05:05 2009
@@ -18,34 +18,16 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
 
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 
-import org.apache.hadoop.io.RawComparator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.NullableBytesWritable;
-import org.apache.pig.impl.io.NullableDoubleWritable;
-import org.apache.pig.impl.io.NullableFloatWritable;
-import org.apache.pig.impl.io.NullableIntWritable;
-import org.apache.pig.impl.io.NullableLongWritable;
-import org.apache.pig.impl.io.NullableText;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.NullablePartitionWritable;
@@ -62,71 +44,79 @@
   * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned 
   * in a round robin manner.
   */ 
-public class SkewedPartitioner implements Partitioner<PigNullableWritable, Writable> {
-	Map<Tuple, Pair<Integer, Integer> > reducerMap = new HashMap<Tuple, Pair<Integer, Integer> >();
-	static Map<Tuple, Integer> currentIndexMap = new HashMap<Tuple, Integer> ();
-	Integer totalReducers;
+public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable {
+    Map<Tuple, Pair<Integer, Integer> > reducerMap = new HashMap<Tuple, Pair<Integer, Integer> >();
+    static Map<Tuple, Integer> currentIndexMap = new HashMap<Tuple, Integer> ();
+    Integer totalReducers;
+    Configuration conf;
 
+    @Override
     public int getPartition(PigNullableWritable wrappedKey, Writable value,
             int numPartitions) {
-		// for streaming tables, return the partition index blindly
-		if (wrappedKey instanceof NullablePartitionWritable && ((int)((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
-			return (int) ((NullablePartitionWritable)wrappedKey).getPartition();
-		}
-
-		// for partition table, compute the index based on the sampler output
-		Pair <Integer, Integer> indexes;
-		Integer curIndex = -1;
-		Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
-		
-		// extract the key from nullablepartitionwritable
-		PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey();
-
-		try {
-			keyTuple.set(0, key.getValueAsPigType());
-		} catch (ExecException e) {
-			return -1;
-		}
-		
-		// if the key is not null and key 
-		if (key instanceof NullableTuple && key.getValueAsPigType() != null) {
-			keyTuple = (Tuple)key.getValueAsPigType();
-		}
-
-		indexes = reducerMap.get(keyTuple);
-		// if the reducerMap does not contain the key, do the default hash based partitioning
-		if (indexes == null) {
-			return (Math.abs(keyTuple.hashCode()) % totalReducers);
-		}
-
-		if (currentIndexMap.containsKey(keyTuple)) {
-	    	curIndex = currentIndexMap.get(keyTuple);
-		}
-		
-		if (curIndex >= (indexes.first + indexes.second) || curIndex == -1) {
-			curIndex = indexes.first;
-		} else {
-			curIndex++;
-		}
-		
-		// set it in the map
-		currentIndexMap.put(keyTuple, curIndex);
-		return (curIndex % totalReducers);
-	}
-
-    @SuppressWarnings("unchecked")
-    public void configure(JobConf job) {
+        // for streaming tables, return the partition index blindly
+        if (wrappedKey instanceof NullablePartitionWritable && ((int)((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
+            return (int) ((NullablePartitionWritable)wrappedKey).getPartition();
+        }
+
+        // for partition table, compute the index based on the sampler output
+        Pair <Integer, Integer> indexes;
+        Integer curIndex = -1;
+        Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
+
+        // extract the key from nullablepartitionwritable
+        PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey();
+
+        try {
+            keyTuple.set(0, key.getValueAsPigType());
+        } catch (ExecException e) {
+            return -1;
+        }
+
+        // if the key is not null and key 
+        if (key instanceof NullableTuple && key.getValueAsPigType() != null) {
+            keyTuple = (Tuple)key.getValueAsPigType();
+        }
+
+        indexes = reducerMap.get(keyTuple);
+        // if the reducerMap does not contain the key, do the default hash based partitioning
+        if (indexes == null) {
+            return (Math.abs(keyTuple.hashCode()) % totalReducers);
+        }
+
+        if (currentIndexMap.containsKey(keyTuple)) {
+            curIndex = currentIndexMap.get(keyTuple);
+        }
+
+        if (curIndex >= (indexes.first + indexes.second) || curIndex == -1) {
+            curIndex = indexes.first;
+        } else {
+            curIndex++;
+        }
+
+        // set it in the map
+        currentIndexMap.put(keyTuple, curIndex);
+        return (curIndex % totalReducers);
+    }
+
+    @Override
+    public void setConf(Configuration job) {
+        conf = job;
         String keyDistFile = job.get("pig.keyDistFile", "");
         if (keyDistFile.length() == 0)
             throw new RuntimeException(this.getClass().getSimpleName() + " used but no key distribution found");
 
-		try {
-			Integer [] redCnt = new Integer[1]; 
-			reducerMap = MapRedUtil.loadPartitionFile(keyDistFile, redCnt, job, DataType.TUPLE);
-			totalReducers = redCnt[0];
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
+        try {
+            Integer [] redCnt = new Integer[1]; 
+            reducerMap = MapRedUtil.loadPartitionFile(keyDistFile, redCnt, job, DataType.TUPLE);
+            totalReducers = redCnt[0];
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Mon Oct 26 18:05:05 2009
@@ -17,27 +17,26 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
-
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -53,14 +52,23 @@
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 
-public class WeightedRangePartitioner implements Partitioner<PigNullableWritable, Writable> {
+public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>   
+                                      implements Configurable {
     PigNullableWritable[] quantiles;
     RawComparator<PigNullableWritable> comparator;
-    public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
-    JobConf job;
+    public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts 
+        = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+    
+    Configuration job;
 
+    @SuppressWarnings("unchecked")
+    @Override
     public int getPartition(PigNullableWritable key, Writable value,
             int numPartitions){
+        if (comparator == null) {
+            comparator = (RawComparator<PigNullableWritable>)PigMapReduce.sJobContext.getSortComparator();
+        }
+        
         if(!weightedParts.containsKey(key)){
             int index = Arrays.binarySearch(quantiles, key, comparator);
             if (index < 0)
@@ -74,10 +82,12 @@
     }
 
     @SuppressWarnings("unchecked")
-    public void configure(JobConf job) {
-        this.job = job;
+    @Override
+    public void setConf(Configuration configuration) {
+        job = configuration;
+
         String quantilesFile = job.get("pig.quantilesFile", "");
-        comparator = job.getOutputKeyComparator();
+
         if (quantilesFile.length() == 0)
             throw new RuntimeException(this.getClass().getSimpleName() + " used but no quantiles found");
         
@@ -154,10 +164,6 @@
         }
     }
 
-    private boolean areEqual(PigNullableWritable sample, PigNullableWritable writable) {
-        return comparator.compare(sample, writable)==0;
-    }
-
     private void convertToArray(
             DataBag quantilesListAsBag) {
         ArrayList<PigNullableWritable> quantilesList = getList(quantilesListAsBag);
@@ -193,4 +199,11 @@
         }
         return list;
     }
+
+    @Override
+    public Configuration getConf() {
+        return job;
+    }
+
+
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Oct 26 18:05:05 2009
@@ -4,37 +4,31 @@
 package org.apache.pig.backend.hadoop.executionengine.util;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.Progressable;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ObjectSerializer;
-
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.io.NullablePartitionWritable;
-
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultTupleFactory;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.data.Tuple;
-import java.io.InputStream;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * A class of utility static methods to be used in the hadoop map reduce backend
@@ -51,7 +45,7 @@
      * @return the StoreFunc reference
      * @throws ExecException
      */
-    public static StoreFunc getStoreFunc(JobConf conf) throws ExecException {
+    public static StoreFunc getStoreFunc(Configuration conf) throws ExecException {
         StoreFunc store;
         try {
             String storeFunc = conf.get("pig.storeFunc", "");
@@ -82,80 +76,80 @@
      * an OutputFormat to write the data
      * @throws IOException
      */
-    public static StoreConfig getStoreConfig(JobConf conf) throws IOException {
+    public static StoreConfig getStoreConfig(Configuration conf) throws IOException {
         return (StoreConfig) ObjectSerializer.deserialize(conf.get(JobControlCompiler.PIG_STORE_CONFIG));
     }
 
-	/**
-	 * Loads the key distribution sampler file
+    /**
+     * Loads the key distribution sampler file
      *
      * @param keyDistFile the name for the distribution file
      * @param totalReducers gets set to the total number of reducers as found in the dist file
      * @param job Ref to a jobCong object
      * @param keyType Type of the key to be stored in the return map. It currently treats Tuple as a special case.
-	 */	
-	@SuppressWarnings("unchecked")
-	public static <E> Map<E, Pair<Integer, Integer> > loadPartitionFile(String keyDistFile,
-								 Integer[] totalReducers, JobConf job, byte keyType) throws IOException {
-
-		Map<E, Pair<Integer, Integer> > reducerMap = new HashMap<E, Pair<Integer, Integer> >();
-		
-		InputStream is;
-		if (job != null) {
-			is = FileLocalizer.openDFSFile(keyDistFile,ConfigurationUtil.toProperties(job));
-		} else {
-			is = FileLocalizer.openDFSFile(keyDistFile);
-		}
-		BinStorage loader = new BinStorage();
-		DataBag partitionList;
-		loader.bindTo(keyDistFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
-		Tuple t = loader.getNext();
-		if(t==null) {
-			throw new RuntimeException("Empty samples file");
-		}
-		// The keydist file is structured as (key, min, max)
-		// min, max being the index of the reducers
-		Map<String, Object > distMap = (Map<String, Object>) t.get (0);
-		partitionList = (DataBag) distMap.get("partition.list");
-		totalReducers[0] = Integer.valueOf(""+distMap.get("totalreducers"));
-		Iterator<Tuple> it = partitionList.iterator();
-		while (it.hasNext()) {
-			Tuple idxTuple = it.next();
-			Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
-			Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
-			// Used to replace the maxIndex with the number of reducers
-			if (maxIndex < minIndex) {
-				maxIndex = totalReducers[0] + maxIndex; 
-			}
-			E keyT;
-
-			// if the join is on more than 1 key
-			if (idxTuple.size() > 3) {
-				// remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
-				// it in the reducer map
-				Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple();
-				for (int i=0; i < idxTuple.size() - 2; i++) {
-					keyTuple.append(idxTuple.get(i));	
-				}
-				keyT = (E) keyTuple;
-			} else {
-				if (keyType == DataType.TUPLE) {
-					keyT = (E)DefaultTupleFactory.getInstance().newTuple(1);
-					((Tuple)keyT).set(0,idxTuple.get(0));
-				} else {
-					keyT = (E) idxTuple.get(0);
-				}
-			}
-			// number of reducers
-			Integer cnt = 0;
-			if (minIndex < maxIndex) {
-				cnt = maxIndex - minIndex;
-			} else {
-				cnt = totalReducers[0] + maxIndex - minIndex;
-			}
-
-			reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index
-		}		
-		return reducerMap;
-	}
+     */
+    @SuppressWarnings("unchecked")
+    public static <E> Map<E, Pair<Integer, Integer> > loadPartitionFile(String keyDistFile,
+                                        Integer[] totalReducers, Configuration job, byte keyType) throws IOException {
+
+        Map<E, Pair<Integer, Integer> > reducerMap = new HashMap<E, Pair<Integer, Integer> >();
+
+        InputStream is;
+        if (job != null) {
+            is = FileLocalizer.openDFSFile(keyDistFile,ConfigurationUtil.toProperties(job));
+        } else {
+            is = FileLocalizer.openDFSFile(keyDistFile);
+        }
+        BinStorage loader = new BinStorage();
+        DataBag partitionList;
+        loader.bindTo(keyDistFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+        Tuple t = loader.getNext();
+        if(t==null) {
+            throw new RuntimeException("Empty samples file");
+        }
+        // The keydist file is structured as (key, min, max)
+        // min, max being the index of the reducers
+        Map<String, Object > distMap = (Map<String, Object>) t.get (0);
+        partitionList = (DataBag) distMap.get("partition.list");
+        totalReducers[0] = Integer.valueOf(""+distMap.get("totalreducers"));
+        Iterator<Tuple> it = partitionList.iterator();
+        while (it.hasNext()) {
+            Tuple idxTuple = it.next();
+            Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+            Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+            // Used to replace the maxIndex with the number of reducers
+            if (maxIndex < minIndex) {
+                maxIndex = totalReducers[0] + maxIndex; 
+            }
+            E keyT;
+
+            // if the join is on more than 1 key
+            if (idxTuple.size() > 3) {
+                // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
+                // it in the reducer map
+                Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple();
+                for (int i=0; i < idxTuple.size() - 2; i++) {
+                    keyTuple.append(idxTuple.get(i));	
+                }
+                keyT = (E) keyTuple;
+            } else {
+                if (keyType == DataType.TUPLE) {
+                    keyT = (E)DefaultTupleFactory.getInstance().newTuple(1);
+                    ((Tuple)keyT).set(0,idxTuple.get(0));
+                } else {
+                    keyT = (E) idxTuple.get(0);
+                }
+            }
+            // number of reducers
+            Integer cnt = 0;
+            if (minIndex < maxIndex) {
+                cnt = maxIndex - minIndex;
+            } else {
+                cnt = totalReducers[0] + maxIndex - minIndex;
+            }
+
+            reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index
+        }
+        return reducerMap;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Mon Oct 26 18:05:05 2009
@@ -23,12 +23,12 @@
 import java.util.Date;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -57,7 +57,7 @@
       return "part-" + NUMBER_FORMAT.format(partition);
     }
 
-    JobConf job;
+    Configuration job;
     
     String scriptOutputDir;
     String scriptLogDir;
@@ -168,10 +168,6 @@
      */
     private boolean writeErrorToHDFS(int limit, String taskId) {
         if (command.getPersistStderr()) {
-            // These are hard-coded begin/end offsets a Hadoop *taskid*
-            int beginIndex = 25, endIndex = 31;   
-
-            //int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
             int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
             return tipId < command.getLogFilesLimit();
         }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java Mon Oct 26 18:05:05 2009
@@ -26,16 +26,15 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 import java.util.Stack;
-import java.util.Properties ;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.ExecType;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -43,13 +42,13 @@
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.datastorage.SeekableInputStream;
 import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SliceWrapper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.WrappedIOException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 
 public class FileLocalizer {
     private static final Log log = LogFactory.getLog(FileLocalizer.class);
@@ -155,7 +154,7 @@
     public static InputStream openDFSFile(String fileName) throws IOException {
         SliceWrapper wrapper = PigInputFormat.getActiveSplit();
 
-        JobConf conf = null;
+        Configuration conf = null;
         if (wrapper == null) {
         	conf = PigMapReduce.sJobConf;
         }else{
@@ -180,7 +179,7 @@
     public static long getSize(String fileName) throws IOException {
     	SliceWrapper wrapper = PigInputFormat.getActiveSplit();
     	
-    	JobConf conf = null;
+    	Configuration conf = null;
     	if (wrapper == null) {
     		conf = PigMapReduce.sJobConf;
     	}else{
@@ -456,12 +455,6 @@
             initialized = true;
             relativeRoot = pigContext.getDfs().asContainer("/tmp/temp" + r.nextInt());
             toDelete.push(relativeRoot);
-            // Runtime.getRuntime().addShutdownHook(new Thread() {
-              //   @Override
-            //     public void run() {
-            //          deleteTempFiles();
-            //     }
-            //});
         }
     }
 
@@ -593,7 +586,6 @@
             }
         }
         catch (DataStorageException e) {
-            //throw WrappedIOException.wrap("Unable to get collect for pattern " + elem.toString(), e);
             throw e;
         }
     }