You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/07/19 03:01:59 UTC

svn commit: r1148117 [2/3] - in /pig/trunk: ./ shims/ shims/src/ shims/src/hadoop20/ shims/src/hadoop20/org/ shims/src/hadoop20/org/apache/ shims/src/hadoop20/org/apache/pig/ shims/src/hadoop20/org/apache/pig/backend/ shims/src/hadoop20/org/apache/pig/...

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.NullablePartitionWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+/**
+ * This class is the static Mapper & Reducer classes that
+ * are used by Pig to execute Pig Map Reduce jobs. Since
+ * there is a reduce phase, the leaf is bound to be a 
+ * POLocalRearrange. So the map phase has to separate the
+ * key and tuple and collect it into the output
+ * collector.
+ * 
+ * The shuffle and sort phase sorts these keys & tuples
+ * and creates key, List<Tuple> and passes the key and
+ * iterator to the list. The deserialized POPackage operator
+ * is used to package the key, List<Tuple> into pigKey, 
+ * Bag<Tuple> where pigKey is of the appropriate pig type and
+ * then the result of the package is attached to the reduce
+ * plan which is executed if its not empty. Either the result 
+ * of the reduce plan or the package res is collected into
+ * the output collector. 
+ *
+ * The index of the tuple (that is, which bag it should be placed in by the
+ * package) is packed into the key.  This is done so that hadoop sorts the
+ * keys in order of index for join.
+ *
+ * This class is the base class for PigMapReduce, which has slightly
+ * difference among different versions of hadoop. PigMapReduce implementation
+ * is located in $PIG_HOME/shims.
+ */
+public class PigGenericMapReduce {
+
+    public static JobContext sJobContext = null;
+    
+    /**
+     * @deprecated Use {@link UDFContext} instead in the following way to get 
+     * the job's {@link Configuration}:
+     * <pre>UdfContext.getUdfContext().getJobConf()</pre>
+     */
+    @Deprecated
+    public static Configuration sJobConf = null;
+    
+    public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
+    private final static Tuple DUMMYTUPLE = null;
+    
+    public static class Map extends PigMapBase {
+
+        @Override
+        public void collect(Context oc, Tuple tuple) 
+                throws InterruptedException, IOException {
+            
+            Byte index = (Byte)tuple.get(0);
+            PigNullableWritable key =
+                HDataType.getWritableComparableTypes(tuple.get(1), keyType);
+            NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+            
+            // Both the key and the value need the index.  The key needs it so
+            // that it can be sorted on the index in addition to the key
+            // value.  The value needs it so that POPackage can properly
+            // assign the tuple to its slot in the projection.
+            key.setIndex(index);
+            val.setIndex(index);
+
+            oc.write(key, val);
+        }
+    }
+    
+    /**
+     * This "specialized" map class is ONLY to be used in pig queries with
+     * order by a udf. A UDF used for comparison in the order by expects
+     * to be handed tuples. Hence this map class ensures that the "key" used
+     * in the order by is wrapped into a tuple (if it isn't already a tuple)
+     */
+    public static class MapWithComparator extends PigMapBase {
+
+        @Override
+        public void collect(Context oc, Tuple tuple) 
+                throws InterruptedException, IOException {
+            
+            Object keyTuple = null;
+            if(keyType != DataType.TUPLE) {
+                Object k = tuple.get(1);
+                keyTuple = tf.newTuple(k);
+            } else {
+                keyTuple = tuple.get(1);
+            }
+            
+
+            Byte index = (Byte)tuple.get(0);
+            PigNullableWritable key =
+                HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
+            NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+            
+            // Both the key and the value need the index.  The key needs it so
+            // that it can be sorted on the index in addition to the key
+            // value.  The value needs it so that POPackage can properly
+            // assign the tuple to its slot in the projection.
+            key.setIndex(index);
+            val.setIndex(index);
+
+            oc.write(key, val);
+        }
+    }
+
+    /**
+     * Used by Skewed Join
+     */
+    public static class MapWithPartitionIndex extends Map {
+
+        @Override
+        public void collect(Context oc, Tuple tuple) 
+                throws InterruptedException, IOException {
+            
+            Byte tupleKeyIdx = 2;
+            Byte tupleValIdx = 3;
+
+            Byte index = (Byte)tuple.get(0);
+			Integer partitionIndex = -1;
+        	// for partitioning table, the partition index isn't present
+			if (tuple.size() == 3) {
+				//super.collect(oc, tuple);
+				//return;
+				tupleKeyIdx--;
+				tupleValIdx--;
+			} else {
+				partitionIndex = (Integer)tuple.get(1);
+			}
+
+            PigNullableWritable key =
+                HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), keyType);
+
+            NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
+
+            NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
+            
+            // Both the key and the value need the index.  The key needs it so
+            // that it can be sorted on the index in addition to the key
+            // value.  The value needs it so that POPackage can properly
+            // assign the tuple to its slot in the projection.
+            wrappedKey.setIndex(index);
+            
+            // set the partition
+            wrappedKey.setPartition(partitionIndex);
+            val.setIndex(index);
+            oc.write(wrappedKey, val);
+        }
+
+        @Override
+        protected void runPipeline(PhysicalOperator leaf) 
+                throws IOException, InterruptedException {
+            
+            while(true){
+                Result res = leaf.getNext(DUMMYTUPLE);
+                
+                if(res.returnStatus==POStatus.STATUS_OK){
+                    // For POPartitionRearrange, the result is a bag. 
+                    // This operator is used for skewed join
+                    if (res.result instanceof DataBag) {
+                        Iterator<Tuple> its = ((DataBag)res.result).iterator();
+                        while(its.hasNext()) {
+                            collect(outputCollector, its.next());
+                        }
+                    }else{
+                        collect(outputCollector, (Tuple)res.result);
+                    }
+                    continue;
+                }
+                
+                if(res.returnStatus==POStatus.STATUS_EOP) {
+                    return;
+                }
+
+                if(res.returnStatus==POStatus.STATUS_NULL) {
+                    continue;
+                }
+
+                if(res.returnStatus==POStatus.STATUS_ERR){
+                    // remember that we had an issue so that in 
+                    // close() we can do the right thing
+                    errorInMap  = true;
+                    // if there is an errmessage use it
+                    String errMsg;
+                    if(res.result != null) {
+                        errMsg = "Received Error while " +
+                            "processing the map plan: " + res.result;
+                    } else {
+                        errMsg = "Received Error while " +
+                            "processing the map plan.";
+                    }
+
+                    int errCode = 2055;
+                    throw new ExecException(errMsg, errCode, PigException.BUG);
+                }
+            }
+        }
+    }
+
+    abstract public static class Reduce 
+            extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+        
+        protected final Log log = LogFactory.getLog(getClass());
+        
+        //The reduce plan
+        protected PhysicalPlan rp = null;
+
+        // Store operators
+        protected List<POStore> stores;
+        
+        //The POPackage operator which is the
+        //root of every Map Reduce plan is
+        //obtained through the job conf. The portion
+        //remaining after its removal is the reduce
+        //plan
+        protected POPackage pack;
+        
+        ProgressableReporter pigReporter;
+
+        protected Context outputCollector;
+
+        protected boolean errorInReduce = false;
+        
+        PhysicalOperator[] roots;
+
+        private PhysicalOperator leaf;
+        
+        PigContext pigContext = null;
+        protected volatile boolean initialized = false;
+        
+        private boolean inIllustrator = false;
+        
+        /**
+         * Set the reduce plan: to be used by local runner for illustrator
+         * @param plan Reduce plan
+         */
+        public void setReducePlan(PhysicalPlan plan) {
+            rp = plan;
+        }
+
+        /**
+         * Configures the Reduce plan, the POPackage operator
+         * and the reporter thread
+         */
+        @SuppressWarnings("unchecked")
+        @Override
+        protected void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
+            inIllustrator = (context instanceof PigMapReduce.Reduce.IllustratorContext);
+            if (inIllustrator)
+                pack = ((PigMapReduce.Reduce.IllustratorContext) context).pack;
+            Configuration jConf = context.getConfiguration();
+            SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+            sJobContext = context;
+            sJobConfInternal.set(context.getConfiguration());
+            sJobConf = context.getConfiguration();
+            try {
+                PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
+                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
+                
+                if (rp == null)
+                    rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
+                            .get("pig.reducePlan"));
+                stores = PlanHelper.getStores(rp);
+
+                if (!inIllustrator)
+                    pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
+                // To be removed
+                if(rp.isEmpty())
+                    log.debug("Reduce Plan empty!");
+                else{
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    rp.explain(baos);
+                    log.debug(baos.toString());
+                }
+                pigReporter = new ProgressableReporter();
+                if(!(rp.isEmpty())) {
+                    roots = rp.getRoots().toArray(new PhysicalOperator[1]);
+                    leaf = rp.getLeaves().get(0);
+                }
+                
+                // Get the UDF specific context
+            	MapRedUtil.setupUDFContext(jConf);
+            
+            } catch (IOException ioe) {
+                String msg = "Problem while configuring reduce plan.";
+                throw new RuntimeException(msg, ioe);
+            }
+        }
+        
+        /**
+         * The reduce function which packages the key and List&lt;Tuple&gt;
+         * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
+         * The package result is either collected as is, if the reduce plan is
+         * empty or after passing through the reduce plan.
+         */       
+        @Override
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
+                throws IOException, InterruptedException {            
+            
+            if (!initialized) {
+                initialized = true;
+                
+                // cache the collector for use in runPipeline()
+                // which could additionally be called from close()
+                this.outputCollector = context;
+                pigReporter.setRep(context);
+                PhysicalOperator.setReporter(pigReporter);
+
+                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+                PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+                pigHadoopLogger.setAggregate(aggregateWarning);
+                PigStatusReporter.setContext(context);
+                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+                
+                PhysicalOperator.setPigLogger(pigHadoopLogger);
+
+                if (!inIllustrator)
+                    for (POStore store: stores) {
+                        MapReducePOStoreImpl impl 
+                            = new MapReducePOStoreImpl(context);
+                        store.setStoreImpl(impl);
+                        store.setUp();
+                    }
+            }
+          
+            // In the case we optimize the join, we combine
+            // POPackage and POForeach - so we could get many
+            // tuples out of the getnext() call of POJoinPackage
+            // In this case, we process till we see EOP from 
+            // POJoinPacakage.getNext()
+            if (pack instanceof POJoinPackage)
+            {
+                pack.attachInput(key, tupIter.iterator());
+                while (true)
+                {
+                    if (processOnePackageOutput(context))
+                        break;
+                }
+            }
+            else {
+                // join is not optimized, so package will
+                // give only one tuple out for the key
+                pack.attachInput(key, tupIter.iterator());
+                processOnePackageOutput(context);
+            } 
+        }
+        
+        // return: false-more output
+        //         true- end of processing
+        public boolean processOnePackageOutput(Context oc) 
+                throws IOException, InterruptedException {
+
+            Result res = pack.getNext(DUMMYTUPLE);
+            if(res.returnStatus==POStatus.STATUS_OK){
+                Tuple packRes = (Tuple)res.result;
+                
+                if(rp.isEmpty()){
+                    oc.write(null, packRes);
+                    return false;
+                }
+                for (int i = 0; i < roots.length; i++) {
+                    roots[i].attachInput(packRes);
+                }
+                runPipeline(leaf);
+                
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_NULL) {
+                return false;
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_ERR){
+                int errCode = 2093;
+                String msg = "Encountered error in package operator while processing group.";
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_EOP) {
+                return true;
+            }
+                
+            return false;
+            
+        }
+        
+        /**
+         * @param leaf
+         * @throws InterruptedException
+         * @throws IOException 
+         */
+        protected void runPipeline(PhysicalOperator leaf) 
+                throws InterruptedException, IOException {
+            
+            while(true)
+            {
+                Result redRes = leaf.getNext(DUMMYTUPLE);
+                if(redRes.returnStatus==POStatus.STATUS_OK){
+                    try{
+                        outputCollector.write(null, (Tuple)redRes.result);
+                    }catch(Exception e) {
+                        throw new IOException(e);
+                    }
+                    continue;
+                }
+                
+                if(redRes.returnStatus==POStatus.STATUS_EOP) {
+                    return;
+                }
+                
+                if(redRes.returnStatus==POStatus.STATUS_NULL) {
+                    continue;
+                }
+                
+                if(redRes.returnStatus==POStatus.STATUS_ERR){
+                    // remember that we had an issue so that in 
+                    // close() we can do the right thing
+                    errorInReduce   = true;
+                    // if there is an errmessage use it
+                    String msg;
+                    if(redRes.result != null) {
+                        msg = "Received Error while " +
+                        "processing the reduce plan: " + redRes.result;
+                    } else {
+                        msg = "Received Error while " +
+                        "processing the reduce plan.";
+                    }
+                    int errCode = 2090;
+                    throw new ExecException(msg, errCode, PigException.BUG);
+                }
+            }
+        }
+        
+        /**
+         * Will be called once all the intermediate keys and values are
+         * processed. So right place to stop the reporter thread.
+         */
+        @Override 
+        protected void cleanup(Context context) throws IOException, InterruptedException {
+            super.cleanup(context);
+            
+            if(errorInReduce) {
+                // there was an error in reduce - just return
+                return;
+            }
+            
+            if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
+                // If there is a stream in the pipeline we could 
+                // potentially have more to process - so lets
+                // set the flag stating that all map input has been sent
+                // already and then lets run the pipeline one more time
+                // This will result in nothing happening in the case
+                // where there is no stream in the pipeline
+                rp.endOfAllInput = true;
+                runPipeline(leaf);
+            }
+
+            for (POStore store: stores) {
+                if (!initialized) {
+                    MapReducePOStoreImpl impl 
+                        = new MapReducePOStoreImpl(context);
+                    store.setStoreImpl(impl);
+                    store.setUp();
+                }
+                store.tearDown();
+            }
+                        
+            //Calling EvalFunc.finish()
+            UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp));
+            try {
+                finisher.visit();
+            } catch (VisitorException e) {
+                throw new IOException("Error trying to finish UDFs",e);
+            }
+            
+            PhysicalOperator.setReporter(null);
+            initialized = false;
+        }
+        
+        /**
+         * Get reducer's illustrator context
+         * 
+         * @param input Input buffer as output by maps
+         * @param pkg package
+         * @return reducer's illustrator context
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        abstract public Context getIllustratorContext(Job job,
+               List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException;
+    }
+    
+    /**
+     * This "specialized" reduce class is ONLY to be used in pig queries with
+     * order by a udf. A UDF used for comparison in the order by expects
+     * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
+     * ensures that the "key" used in the order by is wrapped into a tuple (if it 
+     * isn't already a tuple). This reduce class unwraps this tuple in the case where
+     * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
+     * for processing
+     */
+    public static class ReduceWithComparator extends PigMapReduce.Reduce {
+        
+        private byte keyType;
+        
+        /**
+         * Configures the Reduce plan, the POPackage operator
+         * and the reporter thread
+         */
+        @Override
+        protected void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
+            keyType = pack.getKeyType();
+        }
+
+        /**
+         * The reduce function which packages the key and List&lt;Tuple&gt;
+         * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
+         * The package result is either collected as is, if the reduce plan is
+         * empty or after passing through the reduce plan.
+         */
+        @Override
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
+                throws IOException, InterruptedException {
+            
+            if (!initialized) {
+                initialized = true;
+                
+                // cache the collector for use in runPipeline()
+                // which could additionally be called from close()
+                this.outputCollector = context;
+                pigReporter.setRep(context);
+                PhysicalOperator.setReporter(pigReporter);
+
+                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                
+                PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+                pigHadoopLogger.setAggregate(aggregateWarning);
+                PigStatusReporter.setContext(context);
+                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+
+                PhysicalOperator.setPigLogger(pigHadoopLogger);
+                
+                for (POStore store: stores) {
+                    MapReducePOStoreImpl impl 
+                        = new MapReducePOStoreImpl(context);
+                    store.setStoreImpl(impl);
+                    store.setUp();
+                }
+            }
+            
+            // If the keyType is not a tuple, the MapWithComparator.collect()
+            // would have wrapped the key into a tuple so that the 
+            // comparison UDF used in the order by can process it.
+            // We need to unwrap the key out of the tuple and hand it
+            // to the POPackage for processing
+            if(keyType != DataType.TUPLE) {
+                Tuple t = (Tuple)(key.getValueAsPigType());
+                try {
+                    key = HDataType.getWritableComparableTypes(t.get(0), keyType);
+                } catch (ExecException e) {
+                    throw e;
+                }
+            }
+            
+            pack.attachInput(key, tupIter.iterator());
+            
+            Result res = pack.getNext(DUMMYTUPLE);
+            if(res.returnStatus==POStatus.STATUS_OK){
+                Tuple packRes = (Tuple)res.result;
+                
+                if(rp.isEmpty()){
+                    context.write(null, packRes);
+                    return;
+                }
+                
+                rp.attachInput(packRes);
+
+                List<PhysicalOperator> leaves = rp.getLeaves();
+                
+                PhysicalOperator leaf = leaves.get(0);
+                runPipeline(leaf);
+                
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_NULL) {
+                return;
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_ERR){
+                int errCode = 2093;
+                String msg = "Encountered error in package operator while processing group.";
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+
+        }
+
+    }
+   
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Tue Jul 19 01:01:53 2011
@@ -47,6 +47,7 @@ import org.apache.pig.CollectableLoadFun
 import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -266,7 +267,7 @@ public class PigInputFormat extends Inpu
                 // get the InputFormat from it and ask for splits
                 InputFormat inpFormat = loadFunc.getInputFormat();
                 List<InputSplit> oneInputSplits = inpFormat.getSplits(
-                        new JobContext(inputSpecificJob.getConfiguration(), 
+                        HadoopShims.createJobContext(inputSpecificJob.getConfiguration(), 
                                 jobcontext.getJobID()));
                 List<InputSplit> oneInputPigSplits = getPigSplits(
                         oneInputSplits, i, inpTargets.get(i), fs.getDefaultBlockSize(), combinable, confClone);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 19 01:01:53 2011
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
-
-public abstract class PigMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
-    private static final Tuple DUMMYTUPLE = null;
-
-    private final Log log = LogFactory.getLog(getClass());
-    
-    protected byte keyType;
-        
-    //Map Plan
-    protected PhysicalPlan mp = null;
-
-    // Store operators
-    protected List<POStore> stores;
-
-    protected TupleFactory tf = TupleFactory.getInstance();
-    
-    boolean inIllustrator = false;
-    
-    Context outputCollector;
-    
-    // Reporter that will be used by operators
-    // to transmit heartbeat
-    ProgressableReporter pigReporter;
-
-    protected boolean errorInMap = false;
-    
-    PhysicalOperator[] roots;
-
-    private PhysicalOperator leaf;
-
-    PigContext pigContext = null;
-    private volatile boolean initialized = false;
-    
-    /**
-     * for local map/reduce simulation
-     * @param plan the map plan
-     */
-    public void setMapPlan(PhysicalPlan plan) {
-        mp = plan;
-    }
-    
-    /**
-     * Will be called when all the tuples in the input
-     * are done. So reporter thread should be closed.
-     */
-    @Override
-    public void cleanup(Context context) throws IOException, InterruptedException {
-        super.cleanup(context);
-        if(errorInMap) {
-            //error in map - returning
-            return;
-        }
-            
-        if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
-            // If there is a stream in the pipeline or if this map job belongs to merge-join we could 
-            // potentially have more to process - so lets
-            // set the flag stating that all map input has been sent
-            // already and then lets run the pipeline one more time
-            // This will result in nothing happening in the case
-            // where there is no stream or it is not a merge-join in the pipeline
-            mp.endOfAllInput = true;
-            runPipeline(leaf);
-        }
-
-        for (POStore store: stores) {
-            if (!initialized) {
-                MapReducePOStoreImpl impl 
-                    = new MapReducePOStoreImpl(context);
-                store.setStoreImpl(impl);
-                store.setUp();
-            }
-            store.tearDown();
-        }
-        
-        //Calling EvalFunc.finish()
-        UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(mp));
-        try {
-            finisher.visit();
-        } catch (VisitorException e) {
-            int errCode = 2121;
-            String msg = "Error while calling finish method on UDFs.";
-            throw new VisitorException(msg, errCode, PigException.BUG, e);
-        }
-        
-        mp = null;
-
-        PhysicalOperator.setReporter(null);
-        initialized = false;
-    }
-
-    /**
-     * Configures the mapper with the map plan and the
-     * reproter thread
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setup(Context context) throws IOException, InterruptedException {       	
-        super.setup(context);
-        
-        Configuration job = context.getConfiguration();
-        SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
-        PigMapReduce.sJobContext = context;
-        PigMapReduce.sJobConfInternal.set(context.getConfiguration());
-        PigMapReduce.sJobConf = context.getConfiguration();
-        inIllustrator = (context instanceof IllustratorContext);
-        
-        PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
-        pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
-        if (pigContext.getLog4jProperties()!=null)
-            PropertyConfigurator.configure(pigContext.getLog4jProperties());
-        
-        if (mp == null)
-            mp = (PhysicalPlan) ObjectSerializer.deserialize(
-                job.get("pig.mapPlan"));
-        stores = PlanHelper.getStores(mp);
-        
-        // To be removed
-        if(mp.isEmpty())
-            log.debug("Map Plan empty!");
-        else{
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            mp.explain(baos);
-            log.debug(baos.toString());
-        }
-        keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
-        // till here
-        
-        pigReporter = new ProgressableReporter();
-        // Get the UDF specific context
-        MapRedUtil.setupUDFContext(job);
-
-        if(!(mp.isEmpty())) {
-
-            PigSplit split = (PigSplit)context.getInputSplit();
-            List<OperatorKey> targetOpKeys = split.getTargetOps();
-            
-            ArrayList<PhysicalOperator> targetOpsAsList = new ArrayList<PhysicalOperator>();
-            for (OperatorKey targetKey : targetOpKeys) {                    
-                targetOpsAsList.add(mp.getOperator(targetKey));
-            }
-            roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
-            leaf = mp.getLeaves().get(0);               
-        }
-        
-        PigStatusReporter.setContext(context);
- 
-    }
-    
-    /**
-     * The map function that attaches the inpTuple appropriately
-     * and executes the map plan if its not empty. Collects the
-     * result of execution into oc or the input directly to oc
-     * if map plan empty. The collection is left abstract for the
-     * map-only or map-reduce job to implement. Map-only collects
-     * the tuple as-is whereas map-reduce collects it after extracting
-     * the key and indexed tuple.
-     */   
-    @Override
-    protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {     
-        if(!initialized) {
-            initialized  = true;
-            // cache the collector for use in runPipeline() which
-            // can be called from close()
-            this.outputCollector = context;
-            pigReporter.setRep(context);
-            PhysicalOperator.setReporter(pigReporter);
-           
-            for (POStore store: stores) {
-                MapReducePOStoreImpl impl 
-                    = new MapReducePOStoreImpl(context);
-                store.setStoreImpl(impl);
-                if (!pigContext.inIllustrator)
-                    store.setUp();
-            }
-            
-            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
-            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
-            pigHadoopLogger.setAggregate(aggregateWarning);           
-            pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
-            PhysicalOperator.setPigLogger(pigHadoopLogger);
-        }
-        
-        if (mp.isEmpty()) {
-            collect(context,inpTuple);
-            return;
-        }
-        
-        for (PhysicalOperator root : roots) {
-            if (inIllustrator) {
-                if (root != null) {
-                    root.attachInput(inpTuple);
-                }
-            } else {
-                root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
-            }
-        }
-            
-        runPipeline(leaf);
-    }
-
-    protected void runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException {
-        while(true){
-            Result res = leaf.getNext(DUMMYTUPLE);
-            if(res.returnStatus==POStatus.STATUS_OK){
-                collect(outputCollector,(Tuple)res.result);
-                continue;
-            }
-            
-            if(res.returnStatus==POStatus.STATUS_EOP) {
-                return;
-            }
-            
-            if(res.returnStatus==POStatus.STATUS_NULL)
-                continue;
-            
-            if(res.returnStatus==POStatus.STATUS_ERR){
-                // remember that we had an issue so that in 
-                // close() we can do the right thing
-                errorInMap  = true;
-                // if there is an errmessage use it
-                String errMsg;
-                if(res.result != null) {
-                    errMsg = "Received Error while " +
-                    "processing the map plan: " + res.result;
-                } else {
-                    errMsg = "Received Error while " +
-                    "processing the map plan.";
-                }
-                    
-                int errCode = 2055;
-                ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
-                throw ee;
-            }
-        }
-        
-    }
-
-    abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException;
-
-    /**
-     * @return the keyType
-     */
-    public byte getKeyType() {
-        return keyType;
-    }
-
-    /**
-     * @param keyType the keyType to set
-     */
-    public void setKeyType(byte keyType) {
-        this.keyType = keyType;
-    }
-    
-    /**
-     * 
-     * Get mapper's illustrator context
-     * 
-     * @param conf  Configuration
-     * @param input Input bag to serve as data source
-     * @param output Map output buffer
-     * @param split the split
-     * @return Illustrator's context
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public Context getIllustratorContext(Configuration conf, DataBag input,
-          List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
-          throws IOException, InterruptedException {
-        return new IllustratorContext(conf, input, output, split);
-    }
-    
-    public class IllustratorContext extends Context {
-        private DataBag input;
-        List<Pair<PigNullableWritable, Writable>> output;
-        private Iterator<Tuple> it = null;
-        private Tuple value = null;
-        private boolean init  = false;
-
-        public IllustratorContext(Configuration conf, DataBag input,
-              List<Pair<PigNullableWritable, Writable>> output,
-              InputSplit split) throws IOException, InterruptedException {
-              super(conf, new TaskAttemptID(), null, null, null, null, split);
-              if (output == null)
-                  throw new IOException("Null output can not be used");
-              this.input = input; this.output = output;
-        }
-        
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            if (input == null) {
-                if (!init) {
-                    init = true;
-                    return true;
-                }
-                return false;
-            }
-            if (it == null)
-                it = input.iterator();
-            if (!it.hasNext())
-                return false;
-            value = it.next();
-            return true;
-        }
-        
-        @Override
-        public Text getCurrentKey() {
-          return null;
-        }
-        
-        @Override
-        public Tuple getCurrentValue() {
-          return value;
-        }
-        
-        @Override
-        public void write(PigNullableWritable key, Writable value) 
-            throws IOException, InterruptedException {
-            output.add(new Pair<PigNullableWritable, Writable>(key, value));
-        }
-        
-        @Override
-        public void progress() {
-          
-        }
-    }
-}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 19 01:01:53 2011
@@ -1,768 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Collections;
-import java.util.Comparator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.pen.FakeRawKeyValueIterator;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.NullablePartitionWritable;
-import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
-
-/**
- * This class is the static Mapper &amp; Reducer classes that
- * are used by Pig to execute Pig Map Reduce jobs. Since
- * there is a reduce phase, the leaf is bound to be a 
- * POLocalRearrange. So the map phase has to separate the
- * key and tuple and collect it into the output
- * collector.
- * 
- * The shuffle and sort phase sorts these keys &amp; tuples
- * and creates key, List&lt;Tuple&gt; and passes the key and
- * iterator to the list. The deserialized POPackage operator
- * is used to package the key, List&lt;Tuple&gt; into pigKey, 
- * Bag&lt;Tuple&gt; where pigKey is of the appropriate pig type and
- * then the result of the package is attached to the reduce
- * plan which is executed if its not empty. Either the result 
- * of the reduce plan or the package res is collected into
- * the output collector. 
- *
- * The index of the tuple (that is, which bag it should be placed in by the
- * package) is packed into the key.  This is done so that hadoop sorts the
- * keys in order of index for join.
- *
- */
-public class PigMapReduce {
-
-    public static JobContext sJobContext = null;
-    
-    /**
-     * @deprecated Use {@link UDFContext} instead in the following way to get 
-     * the job's {@link Configuration}:
-     * <pre>UdfContext.getUdfContext().getJobConf()</pre>
-     */
-    @Deprecated
-    public static Configuration sJobConf = null;
-    
-    public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
-    private final static Tuple DUMMYTUPLE = null;
-    
-    public static class Map extends PigMapBase {
-
-        @Override
-        public void collect(Context oc, Tuple tuple) 
-                throws InterruptedException, IOException {
-            
-            Byte index = (Byte)tuple.get(0);
-            PigNullableWritable key =
-                HDataType.getWritableComparableTypes(tuple.get(1), keyType);
-            NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-            
-            // Both the key and the value need the index.  The key needs it so
-            // that it can be sorted on the index in addition to the key
-            // value.  The value needs it so that POPackage can properly
-            // assign the tuple to its slot in the projection.
-            key.setIndex(index);
-            val.setIndex(index);
-
-            oc.write(key, val);
-        }
-    }
-    
-    /**
-     * This "specialized" map class is ONLY to be used in pig queries with
-     * order by a udf. A UDF used for comparison in the order by expects
-     * to be handed tuples. Hence this map class ensures that the "key" used
-     * in the order by is wrapped into a tuple (if it isn't already a tuple)
-     */
-    public static class MapWithComparator extends PigMapBase {
-
-        @Override
-        public void collect(Context oc, Tuple tuple) 
-                throws InterruptedException, IOException {
-            
-            Object keyTuple = null;
-            if(keyType != DataType.TUPLE) {
-                Object k = tuple.get(1);
-                keyTuple = tf.newTuple(k);
-            } else {
-                keyTuple = tuple.get(1);
-            }
-            
-
-            Byte index = (Byte)tuple.get(0);
-            PigNullableWritable key =
-                HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
-            NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-            
-            // Both the key and the value need the index.  The key needs it so
-            // that it can be sorted on the index in addition to the key
-            // value.  The value needs it so that POPackage can properly
-            // assign the tuple to its slot in the projection.
-            key.setIndex(index);
-            val.setIndex(index);
-
-            oc.write(key, val);
-        }
-    }
-
-    /**
-     * Used by Skewed Join
-     */
-    public static class MapWithPartitionIndex extends Map {
-
-        @Override
-        public void collect(Context oc, Tuple tuple) 
-                throws InterruptedException, IOException {
-            
-            Byte tupleKeyIdx = 2;
-            Byte tupleValIdx = 3;
-
-            Byte index = (Byte)tuple.get(0);
-			Integer partitionIndex = -1;
-        	// for partitioning table, the partition index isn't present
-			if (tuple.size() == 3) {
-				//super.collect(oc, tuple);
-				//return;
-				tupleKeyIdx--;
-				tupleValIdx--;
-			} else {
-				partitionIndex = (Integer)tuple.get(1);
-			}
-
-            PigNullableWritable key =
-                HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), keyType);
-
-            NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
-
-            NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
-            
-            // Both the key and the value need the index.  The key needs it so
-            // that it can be sorted on the index in addition to the key
-            // value.  The value needs it so that POPackage can properly
-            // assign the tuple to its slot in the projection.
-            wrappedKey.setIndex(index);
-            
-            // set the partition
-            wrappedKey.setPartition(partitionIndex);
-            val.setIndex(index);
-            oc.write(wrappedKey, val);
-        }
-
-        @Override
-        protected void runPipeline(PhysicalOperator leaf) 
-                throws IOException, InterruptedException {
-            
-            while(true){
-                Result res = leaf.getNext(DUMMYTUPLE);
-                
-                if(res.returnStatus==POStatus.STATUS_OK){
-                    // For POPartitionRearrange, the result is a bag. 
-                    // This operator is used for skewed join
-                    if (res.result instanceof DataBag) {
-                        Iterator<Tuple> its = ((DataBag)res.result).iterator();
-                        while(its.hasNext()) {
-                            collect(outputCollector, its.next());
-                        }
-                    }else{
-                        collect(outputCollector, (Tuple)res.result);
-                    }
-                    continue;
-                }
-                
-                if(res.returnStatus==POStatus.STATUS_EOP) {
-                    return;
-                }
-
-                if(res.returnStatus==POStatus.STATUS_NULL) {
-                    continue;
-                }
-
-                if(res.returnStatus==POStatus.STATUS_ERR){
-                    // remember that we had an issue so that in 
-                    // close() we can do the right thing
-                    errorInMap  = true;
-                    // if there is an errmessage use it
-                    String errMsg;
-                    if(res.result != null) {
-                        errMsg = "Received Error while " +
-                            "processing the map plan: " + res.result;
-                    } else {
-                        errMsg = "Received Error while " +
-                            "processing the map plan.";
-                    }
-
-                    int errCode = 2055;
-                    throw new ExecException(errMsg, errCode, PigException.BUG);
-                }
-            }
-        }
-    }
-
-    public static class Reduce 
-            extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-        
-        protected final Log log = LogFactory.getLog(getClass());
-        
-        //The reduce plan
-        protected PhysicalPlan rp = null;
-
-        // Store operators
-        protected List<POStore> stores;
-        
-        //The POPackage operator which is the
-        //root of every Map Reduce plan is
-        //obtained through the job conf. The portion
-        //remaining after its removal is the reduce
-        //plan
-        protected POPackage pack;
-        
-        ProgressableReporter pigReporter;
-
-        protected Context outputCollector;
-
-        protected boolean errorInReduce = false;
-        
-        PhysicalOperator[] roots;
-
-        private PhysicalOperator leaf;
-        
-        PigContext pigContext = null;
-        protected volatile boolean initialized = false;
-        
-        private boolean inIllustrator = false;
-        
-        /**
-         * Set the reduce plan: to be used by local runner for illustrator
-         * @param plan Reduce plan
-         */
-        public void setReducePlan(PhysicalPlan plan) {
-            rp = plan;
-        }
-
-        /**
-         * Configures the Reduce plan, the POPackage operator
-         * and the reporter thread
-         */
-        @SuppressWarnings("unchecked")
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            inIllustrator = (context instanceof IllustratorContext);
-            if (inIllustrator)
-                pack = ((IllustratorContext) context).pack;
-            Configuration jConf = context.getConfiguration();
-            SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
-            sJobContext = context;
-            sJobConfInternal.set(context.getConfiguration());
-            sJobConf = context.getConfiguration();
-            try {
-                PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
-                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-                
-                if (rp == null)
-                    rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
-                            .get("pig.reducePlan"));
-                stores = PlanHelper.getStores(rp);
-
-                if (!inIllustrator)
-                    pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
-                // To be removed
-                if(rp.isEmpty())
-                    log.debug("Reduce Plan empty!");
-                else{
-                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                    rp.explain(baos);
-                    log.debug(baos.toString());
-                }
-                pigReporter = new ProgressableReporter();
-                if(!(rp.isEmpty())) {
-                    roots = rp.getRoots().toArray(new PhysicalOperator[1]);
-                    leaf = rp.getLeaves().get(0);
-                }
-                
-                // Get the UDF specific context
-            	MapRedUtil.setupUDFContext(jConf);
-            
-            } catch (IOException ioe) {
-                String msg = "Problem while configuring reduce plan.";
-                throw new RuntimeException(msg, ioe);
-            }
-        }
-        
-        /**
-         * The reduce function which packages the key and List&lt;Tuple&gt;
-         * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
-         * The package result is either collected as is, if the reduce plan is
-         * empty or after passing through the reduce plan.
-         */       
-        @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
-                throws IOException, InterruptedException {            
-            
-            if (!initialized) {
-                initialized = true;
-                
-                // cache the collector for use in runPipeline()
-                // which could additionally be called from close()
-                this.outputCollector = context;
-                pigReporter.setRep(context);
-                PhysicalOperator.setReporter(pigReporter);
-
-                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
-                PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
-                pigHadoopLogger.setAggregate(aggregateWarning);
-                PigStatusReporter.setContext(context);
-                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-                
-                PhysicalOperator.setPigLogger(pigHadoopLogger);
-
-                if (!inIllustrator)
-                    for (POStore store: stores) {
-                        MapReducePOStoreImpl impl 
-                            = new MapReducePOStoreImpl(context);
-                        store.setStoreImpl(impl);
-                        store.setUp();
-                    }
-            }
-          
-            // In the case we optimize the join, we combine
-            // POPackage and POForeach - so we could get many
-            // tuples out of the getnext() call of POJoinPackage
-            // In this case, we process till we see EOP from 
-            // POJoinPacakage.getNext()
-            if (pack instanceof POJoinPackage)
-            {
-                pack.attachInput(key, tupIter.iterator());
-                while (true)
-                {
-                    if (processOnePackageOutput(context))
-                        break;
-                }
-            }
-            else {
-                // join is not optimized, so package will
-                // give only one tuple out for the key
-                pack.attachInput(key, tupIter.iterator());
-                processOnePackageOutput(context);
-            } 
-        }
-        
-        // return: false-more output
-        //         true- end of processing
-        public boolean processOnePackageOutput(Context oc) 
-                throws IOException, InterruptedException {
-
-            Result res = pack.getNext(DUMMYTUPLE);
-            if(res.returnStatus==POStatus.STATUS_OK){
-                Tuple packRes = (Tuple)res.result;
-                
-                if(rp.isEmpty()){
-                    oc.write(null, packRes);
-                    return false;
-                }
-                for (int i = 0; i < roots.length; i++) {
-                    roots[i].attachInput(packRes);
-                }
-                runPipeline(leaf);
-                
-            }
-            
-            if(res.returnStatus==POStatus.STATUS_NULL) {
-                return false;
-            }
-            
-            if(res.returnStatus==POStatus.STATUS_ERR){
-                int errCode = 2093;
-                String msg = "Encountered error in package operator while processing group.";
-                throw new ExecException(msg, errCode, PigException.BUG);
-            }
-            
-            if(res.returnStatus==POStatus.STATUS_EOP) {
-                return true;
-            }
-                
-            return false;
-            
-        }
-        
-        /**
-         * @param leaf
-         * @throws InterruptedException
-         * @throws IOException 
-         */
-        protected void runPipeline(PhysicalOperator leaf) 
-                throws InterruptedException, IOException {
-            
-            while(true)
-            {
-                Result redRes = leaf.getNext(DUMMYTUPLE);
-                if(redRes.returnStatus==POStatus.STATUS_OK){
-                    try{
-                        outputCollector.write(null, (Tuple)redRes.result);
-                    }catch(Exception e) {
-                        throw new IOException(e);
-                    }
-                    continue;
-                }
-                
-                if(redRes.returnStatus==POStatus.STATUS_EOP) {
-                    return;
-                }
-                
-                if(redRes.returnStatus==POStatus.STATUS_NULL) {
-                    continue;
-                }
-                
-                if(redRes.returnStatus==POStatus.STATUS_ERR){
-                    // remember that we had an issue so that in 
-                    // close() we can do the right thing
-                    errorInReduce   = true;
-                    // if there is an errmessage use it
-                    String msg;
-                    if(redRes.result != null) {
-                        msg = "Received Error while " +
-                        "processing the reduce plan: " + redRes.result;
-                    } else {
-                        msg = "Received Error while " +
-                        "processing the reduce plan.";
-                    }
-                    int errCode = 2090;
-                    throw new ExecException(msg, errCode, PigException.BUG);
-                }
-            }
-        }
-        
-        /**
-         * Will be called once all the intermediate keys and values are
-         * processed. So right place to stop the reporter thread.
-         */
-        @Override 
-        protected void cleanup(Context context) throws IOException, InterruptedException {
-            super.cleanup(context);
-            
-            if(errorInReduce) {
-                // there was an error in reduce - just return
-                return;
-            }
-            
-            if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
-                // If there is a stream in the pipeline we could 
-                // potentially have more to process - so lets
-                // set the flag stating that all map input has been sent
-                // already and then lets run the pipeline one more time
-                // This will result in nothing happening in the case
-                // where there is no stream in the pipeline
-                rp.endOfAllInput = true;
-                runPipeline(leaf);
-            }
-
-            for (POStore store: stores) {
-                if (!initialized) {
-                    MapReducePOStoreImpl impl 
-                        = new MapReducePOStoreImpl(context);
-                    store.setStoreImpl(impl);
-                    store.setUp();
-                }
-                store.tearDown();
-            }
-                        
-            //Calling EvalFunc.finish()
-            UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp));
-            try {
-                finisher.visit();
-            } catch (VisitorException e) {
-                throw new IOException("Error trying to finish UDFs",e);
-            }
-            
-            PhysicalOperator.setReporter(null);
-            initialized = false;
-        }
-        
-        /**
-         * Get reducer's illustrator context
-         * 
-         * @param input Input buffer as output by maps
-         * @param pkg package
-         * @return reducer's illustrator context
-         * @throws IOException
-         * @throws InterruptedException
-         */
-        public Context getIllustratorContext(Job job,
-               List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
-            return new IllustratorContext(job, input, pkg);
-        }
-        
-        @SuppressWarnings("unchecked")
-        public class IllustratorContext extends Context {
-            private PigNullableWritable currentKey = null, nextKey = null;
-            private NullableTuple nextValue = null;
-            private List<NullableTuple> currentValues = null;
-            private Iterator<Pair<PigNullableWritable, Writable>> it;
-            private final ByteArrayOutputStream bos;
-            private final DataOutputStream dos;
-            private final RawComparator sortComparator, groupingComparator;
-            POPackage pack = null;
-
-            public IllustratorContext(Job job,
-                  List<Pair<PigNullableWritable, Writable>> input,
-                  POPackage pkg
-                  ) throws IOException, InterruptedException {
-                super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
-                    null, null, null, null, null, null, PigNullableWritable.class, NullableTuple.class);
-                bos = new ByteArrayOutputStream();
-                dos = new DataOutputStream(bos);
-                org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
-                sortComparator = nwJob.getSortComparator();
-                groupingComparator = nwJob.getGroupingComparator();
-                
-                Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
-                        @Override
-                        public int compare(Pair<PigNullableWritable, Writable> o1,
-                                           Pair<PigNullableWritable, Writable> o2) {
-                            try {
-                                o1.first.write(dos);
-                                int l1 = bos.size();
-                                o2.first.write(dos);
-                                int l2 = bos.size();
-                                byte[] bytes = bos.toByteArray();
-                                bos.reset();
-                                return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
-                            } catch (IOException e) {
-                                throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
-                            }
-                        }
-                    }
-                );
-                currentValues = new ArrayList<NullableTuple>();
-                it = input.iterator();
-                if (it.hasNext()) {
-                    Pair<PigNullableWritable, Writable> entry = it.next();
-                    nextKey = entry.first;
-                    nextValue = (NullableTuple) entry.second;
-                }
-                pack = pkg;
-            }
-            
-            @Override
-            public PigNullableWritable getCurrentKey() {
-                return currentKey;
-            }
-            
-            @Override
-            public boolean nextKey() {
-                if (nextKey == null)
-                    return false;
-                currentKey = nextKey;
-                currentValues.clear();
-                currentValues.add(nextValue);
-                nextKey = null;
-                for(; it.hasNext(); ) {
-                    Pair<PigNullableWritable, Writable> entry = it.next();
-                    /* Why can't raw comparison be used?
-                    byte[] bytes;
-                    int l1, l2;
-                    try {
-                        currentKey.write(dos);
-                        l1 = bos.size();
-                        entry.first.write(dos);
-                        l2 = bos.size();
-                        bytes = bos.toByteArray();
-                    } catch (IOException e) {
-                        throw new RuntimeException("nextKey exception : "+e.getMessage());
-                    }
-                    bos.reset();
-                    if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
-                    */
-                    if (groupingComparator.compare(currentKey, entry.first) == 0)
-                    {
-                        currentValues.add((NullableTuple)entry.second);
-                    } else {
-                        nextKey = entry.first;
-                        nextValue = (NullableTuple) entry.second;
-                        break;
-                    }
-                }
-                return true;
-            }
-            
-            @Override
-            public Iterable<NullableTuple> getValues() {
-                return currentValues;
-            }
-            
-            @Override
-            public void write(PigNullableWritable k, Writable t) {
-            }
-            
-            @Override
-            public void progress() { 
-            }
-        }
-    }
-    
-    /**
-     * This "specialized" reduce class is ONLY to be used in pig queries with
-     * order by a udf. A UDF used for comparison in the order by expects
-     * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
-     * ensures that the "key" used in the order by is wrapped into a tuple (if it 
-     * isn't already a tuple). This reduce class unwraps this tuple in the case where
-     * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
-     * for processing
-     */
-    public static class ReduceWithComparator extends PigMapReduce.Reduce {
-        
-        private byte keyType;
-        
-        /**
-         * Configures the Reduce plan, the POPackage operator
-         * and the reporter thread
-         */
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            keyType = pack.getKeyType();
-        }
-
-        /**
-         * The reduce function which packages the key and List&lt;Tuple&gt;
-         * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
-         * The package result is either collected as is, if the reduce plan is
-         * empty or after passing through the reduce plan.
-         */
-        @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
-                throws IOException, InterruptedException {
-            
-            if (!initialized) {
-                initialized = true;
-                
-                // cache the collector for use in runPipeline()
-                // which could additionally be called from close()
-                this.outputCollector = context;
-                pigReporter.setRep(context);
-                PhysicalOperator.setReporter(pigReporter);
-
-                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-                
-                PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
-                pigHadoopLogger.setAggregate(aggregateWarning);
-                PigStatusReporter.setContext(context);
-                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
-                PhysicalOperator.setPigLogger(pigHadoopLogger);
-                
-                for (POStore store: stores) {
-                    MapReducePOStoreImpl impl 
-                        = new MapReducePOStoreImpl(context);
-                    store.setStoreImpl(impl);
-                    store.setUp();
-                }
-            }
-            
-            // If the keyType is not a tuple, the MapWithComparator.collect()
-            // would have wrapped the key into a tuple so that the 
-            // comparison UDF used in the order by can process it.
-            // We need to unwrap the key out of the tuple and hand it
-            // to the POPackage for processing
-            if(keyType != DataType.TUPLE) {
-                Tuple t = (Tuple)(key.getValueAsPigType());
-                try {
-                    key = HDataType.getWritableComparableTypes(t.get(0), keyType);
-                } catch (ExecException e) {
-                    throw e;
-                }
-            }
-            
-            pack.attachInput(key, tupIter.iterator());
-            
-            Result res = pack.getNext(DUMMYTUPLE);
-            if(res.returnStatus==POStatus.STATUS_OK){
-                Tuple packRes = (Tuple)res.result;
-                
-                if(rp.isEmpty()){
-                    context.write(null, packRes);
-                    return;
-                }
-                
-                rp.attachInput(packRes);
-
-                List<PhysicalOperator> leaves = rp.getLeaves();
-                
-                PhysicalOperator leaf = leaves.get(0);
-                runPipeline(leaf);
-                
-            }
-            
-            if(res.returnStatus==POStatus.STATUS_NULL) {
-                return;
-            }
-            
-            if(res.returnStatus==POStatus.STATUS_ERR){
-                int errCode = 2093;
-                String msg = "Encountered error in package operator while processing group.";
-                throw new ExecException(msg, errCode, PigException.BUG);
-            }
-
-        }
-
-    }
-   
-}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Jul 19 01:01:53 2011
@@ -31,6 +31,7 @@ import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.StoreMetadata;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -103,7 +104,7 @@ public class PigOutputCommitter extends 
         MapRedUtil.setupUDFContext(context.getConfiguration());
         // make a copy of the context so that the actions after this call
         // do not end up updating the same context
-        TaskAttemptContext contextCopy = new TaskAttemptContext(
+        TaskAttemptContext contextCopy = HadoopShims.createTaskAttemptContext(
                 context.getConfiguration(), context.getTaskAttemptID());
         
         // call setLocation() on the storeFunc so that if there are any
@@ -118,8 +119,9 @@ public class PigOutputCommitter extends 
             POStore store) throws IOException {
         // make a copy of the context so that the actions after this call
         // do not end up updating the same context
-        JobContext contextCopy = new JobContext(
+        JobContext contextCopy = HadoopShims.createJobContext(
                 context.getConfiguration(), context.getJobID());
+        MapRedUtil.setupUDFContext(context.getConfiguration());
         
         // call setLocation() on the storeFunc so that if there are any
         // side effects like setting map.output.dir on the Configuration
@@ -165,7 +167,7 @@ public class PigOutputCommitter extends 
 
     @Override
     public void abortTask(TaskAttemptContext context) throws IOException {        
-        if(context.getTaskAttemptID().isMap()) {
+        if(HadoopShims.isMap(context.getTaskAttemptID())) {
             for (Pair<OutputCommitter, POStore> mapCommitter : 
                 mapOutputCommitters) {
                 TaskAttemptContext updatedContext = setUpContext(context, 
@@ -184,7 +186,7 @@ public class PigOutputCommitter extends 
     
     @Override
     public void commitTask(TaskAttemptContext context) throws IOException {
-        if(context.getTaskAttemptID().isMap()) {
+        if(HadoopShims.isMap(context.getTaskAttemptID())) {
             for (Pair<OutputCommitter, POStore> mapCommitter : 
                 mapOutputCommitters) {
                 TaskAttemptContext updatedContext = setUpContext(context, 
@@ -205,7 +207,7 @@ public class PigOutputCommitter extends 
     public boolean needsTaskCommit(TaskAttemptContext context)
             throws IOException {
         boolean needCommit = false;
-        if(context.getTaskAttemptID().isMap()) {
+        if(HadoopShims.isMap(context.getTaskAttemptID())) {
             for (Pair<OutputCommitter, POStore> mapCommitter : 
                 mapOutputCommitters) {
                 TaskAttemptContext updatedContext = setUpContext(context, 
@@ -244,7 +246,7 @@ public class PigOutputCommitter extends 
     
     @Override
     public void setupTask(TaskAttemptContext context) throws IOException {
-        if(context.getTaskAttemptID().isMap()) {
+        if(HadoopShims.isMap(context.getTaskAttemptID())) {
             for (Pair<OutputCommitter, POStore> mapCommitter : 
                 mapOutputCommitters) {
                 TaskAttemptContext updatedContext = setUpContext(context, 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Tue Jul 19 01:01:53 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -192,7 +193,7 @@ public class PigOutputFormat extends Out
         for (POStore store : stores) {
             // make a copy of the original JobContext so that
             // each OutputFormat get a different copy 
-            JobContext jobContextCopy = new JobContext(
+            JobContext jobContextCopy = HadoopShims.createJobContext(
                     jobcontext.getConfiguration(), jobcontext.getJobID());
             
             // set output location

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Tue Jul 19 01:01:53 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.ExecType;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -37,6 +39,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
@@ -47,12 +50,14 @@ import org.apache.pig.impl.io.NullableTe
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>   
                                       implements Configurable {
     PigNullableWritable[] quantiles;
     RawComparator<PigNullableWritable> comparator;
+    PigContext pigContext;
     final public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts 
         = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
     
@@ -85,6 +90,13 @@ public class WeightedRangePartitioner ex
     public void setConf(Configuration configuration) {
         job = configuration;
         
+        try {
+            pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+        } catch (IOException e1) {
+            // should not happen
+            e1.printStackTrace();
+        }
+
         String quantilesFile = configuration.get("pig.quantilesFile", "");
 
         if (quantilesFile.length() == 0) {
@@ -96,7 +108,12 @@ public class WeightedRangePartitioner ex
             
             
             // use local file system to get the quantilesFile
-            Configuration conf = new Configuration(false);
+            Configuration conf;
+            if (pigContext.getExecType()==ExecType.MAPREDUCE) {
+                conf = new Configuration(true);
+            } else {
+                conf = new Configuration(false);
+            }
             if (configuration.get("fs.file.impl")!=null)
                 conf.set("fs.file.impl", configuration.get("fs.file.impl"));
             if (configuration.get("fs.hdfs.impl")!=null)

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Jul 19 01:01:53 2011
@@ -142,6 +142,7 @@ public class PigContext implements Seria
         this.execType = execType;
         this.properties = properties;   
 
+        this.properties.setProperty("exectype", this.execType.name());
         String pigJar = JarManager.findContainingJar(Main.class);
         String hadoopJar = JarManager.findContainingJar(FileSystem.class);
         if (pigJar != null) {

Modified: pig/trunk/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/PigFile.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/PigFile.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/PigFile.java Tue Jul 19 01:01:53 2011
@@ -34,6 +34,7 @@ import org.apache.pig.StoreFuncInterface
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -41,7 +42,6 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 
 
-
 public class PigFile {
     private String file = null;
     boolean append = false;
@@ -70,7 +70,7 @@ public class PigFile {
     public void store(DataBag data, FuncSpec storeFuncSpec, PigContext pigContext) throws IOException {
         Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
         // create a simulated JobContext
-        JobContext jc = new JobContext(conf, new JobID());
+        JobContext jc = HadoopShims.createJobContext(conf, new JobID());
         StoreFuncInterface sfunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(
                 storeFuncSpec);
         OutputFormat<?,?> of = sfunc.getOutputFormat();
@@ -80,7 +80,7 @@ public class PigFile {
         PigOutputFormat.setLocation(jc, store);
         OutputCommitter oc;
         // create a simulated TaskAttemptContext
-        TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+        TaskAttemptContext tac = HadoopShims.createTaskAttemptContext(conf, new TaskAttemptID());
         PigOutputFormat.setLocation(tac, store);
         RecordWriter<?,?> rw ;
         try {