You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/07/19 03:01:59 UTC

svn commit: r1148117 [1/3] - in /pig/trunk: ./ shims/ shims/src/ shims/src/hadoop20/ shims/src/hadoop20/org/ shims/src/hadoop20/org/apache/ shims/src/hadoop20/org/apache/pig/ shims/src/hadoop20/org/apache/pig/backend/ shims/src/hadoop20/org/apache/pig/...

Author: daijy
Date: Tue Jul 19 01:01:53 2011
New Revision: 1148117

URL: http://svn.apache.org/viewvc?rev=1148117&view=rev
Log:
Make Pig work with hadoop .NEXT

Added:
    pig/trunk/shims/
    pig/trunk/shims/src/
    pig/trunk/shims/src/hadoop20/
    pig/trunk/shims/src/hadoop20/org/
    pig/trunk/shims/src/hadoop20/org/apache/
    pig/trunk/shims/src/hadoop20/org/apache/pig/
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/trunk/shims/src/hadoop23/
    pig/trunk/shims/src/hadoop23/org/
    pig/trunk/shims/src/hadoop23/org/apache/
    pig/trunk/shims/src/hadoop23/org/apache/pig/
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/trunk/shims/test/
    pig/trunk/shims/test/hadoop20/
    pig/trunk/shims/test/hadoop20/org/
    pig/trunk/shims/test/hadoop20/org/apache/
    pig/trunk/shims/test/hadoop20/org/apache/pig/
    pig/trunk/shims/test/hadoop20/org/apache/pig/test/
    pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
    pig/trunk/shims/test/hadoop23/
    pig/trunk/shims/test/hadoop23/org/
    pig/trunk/shims/test/hadoop23/org/apache/
    pig/trunk/shims/test/hadoop23/org/apache/pig/
    pig/trunk/shims/test/hadoop23/org/apache/pig/test/
    pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/impl/io/PigFile.java
    pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
    pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
    pig/trunk/src/overview.html
    pig/trunk/test/findbugsExcludeFile.xml
    pig/trunk/test/org/apache/pig/test/MiniCluster.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java
    pig/trunk/test/org/apache/pig/test/TestTypedMap.java
    pig/trunk/test/org/apache/pig/test/TestUDFContext.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jul 19 01:01:53 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2125: Make Pig work with hadoop .NEXT (daijy)
+
 PIG-2143: Make PigStorage optionally store schema; improve docs. (dvryaboy) 
 
 PIG-1973: UDFContext.getUDFContext usage of ThreadLocal pattern 

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Tue Jul 19 01:01:53 2011
@@ -20,6 +20,8 @@
 	xmlns:ivy="antlib:org.apache.ivy.ant">
     <!-- Load all the default properties, and any the user wants    -->
     <!-- to contribute (without having to type -D or edit this file -->
+    <taskdef resource="net/sf/antcontrib/antcontrib.properties"/>
+
     <property file="${user.home}/build.properties" />
     <property file="${basedir}/build.properties" />
 
@@ -145,6 +147,16 @@
 	<property name="ivy.repo.dir" value="${user.home}/ivyrepo" />
     <property name="ivy.dir" location="ivy" />
     <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
+
+    <!--propertyregex property="hadoopversion"
+           input="${hadoop-core.version}"
+           regexp="\d+\.(\d+)\.\d+"
+           select="\1" /-->
+    <property name="hadoopversion" value="20" />
+
+    <property name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
+    <property name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
+
     <property name="hadoop.jar" value="hadoop-core-${hadoop-core.version}.jar" />
 	<property name="asfrepo" value="https://repository.apache.org"/>
 	<property name="asfsnapshotrepo" value="${asfrepo}/content/repositories/snapshots"/>
@@ -194,7 +206,7 @@
     <!-- setup the classpath -->
     <path id="classpath">
 	<path refid="compile.classpath"/>	
-        <fileset file="${lib.dir}/${automaton.jarfile}" />
+        <fileset dir="${lib.dir}" includes="*.jar"/>
         <fileset file="${ivy.lib.dir}/${zookeeper.jarfile}"/>
     </path>
 
@@ -349,13 +361,13 @@
         <echo>*** Else, compile-sources (which only warns about deprecations) target will be executed ***</echo>
                 
         <antcall target="compile-sources" inheritRefs="true" inheritall="true">
-            <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2" />
+            <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2;${src.shims.dir}" />
             <param name="dist" value="${build.classes}" />
             <param name="cp" value="classpath" />
         </antcall>
     
         <antcall target="compile-sources-all-warnings" inheritRefs="true" inheritall="true">
-            <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2" />
+            <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2;${src.shims.dir}" />
             <param name="dist" value="${build.classes}" />
             <param name="cp" value="classpath" />
         </antcall>
@@ -369,7 +381,7 @@
         <echo>*** Else, compile-sources (which only warns about deprecations) target will be executed ***</echo>
         
         <antcall target="compile-sources" inheritRefs="true" inheritall="true">
-            <param name="sources" value="${test.src.dir}" />
+            <param name="sources" value="${test.src.dir};${src.shims.test.dir}" />
             <param name="dist" value="${test.build.classes}" />
             <param name="cp" value="test.classpath" />
             <!-- don't compile PigTestLoader -->
@@ -377,7 +389,7 @@
         </antcall>
 
         <antcall target="compile-sources-all-warnings" inheritRefs="true" inheritall="true">
-            <param name="sources" value="${test.src.dir}" />
+            <param name="sources" value="${test.src.dir};${src.shims.test.dir}" />
             <param name="dist" value="${test.build.classes}" />
             <param name="cp" value="test.classpath" />
             <!-- don't compile PigTestLoader -->
@@ -667,7 +679,7 @@
         <mkdir dir="${test.log.dir}"/>
         <tempfile property="junit.tmp.dir" prefix="pig_junit_tmp" destDir="${java.io.tmpdir}" />
         <mkdir dir="${junit.tmp.dir}/"/>
-        <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
+        <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="1024m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
             <sysproperty key="ssh.gateway" value="${ssh.gateway}" />
             <sysproperty key="hod.server" value="${hod.server}" />
             <sysproperty key="java.io.tmpdir" value="${junit.tmp.dir}" />

Added: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+
+abstract public class PigMapBase extends PigGenericMapBase {
+    /**
+     * 
+     * Get mapper's illustrator context
+     * 
+     * @param conf  Configuration
+     * @param input Input bag to serve as data source
+     * @param output Map output buffer
+     * @param split the split
+     * @return Illustrator's context
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public Context getIllustratorContext(Configuration conf, DataBag input,
+          List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+          throws IOException, InterruptedException {
+        return new IllustratorContext(conf, input, output, split);
+    }
+    
+    public class IllustratorContext extends Context {
+        private DataBag input;
+        List<Pair<PigNullableWritable, Writable>> output;
+        private Iterator<Tuple> it = null;
+        private Tuple value = null;
+        private boolean init  = false;
+
+        public IllustratorContext(Configuration conf, DataBag input,
+              List<Pair<PigNullableWritable, Writable>> output,
+              InputSplit split) throws IOException, InterruptedException {
+              super(conf, new TaskAttemptID(), null, null, null, null, split);
+              if (output == null)
+                  throw new IOException("Null output can not be used");
+              this.input = input; this.output = output;
+        }
+        
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            if (input == null) {
+                if (!init) {
+                    init = true;
+                    return true;
+                }
+                return false;
+            }
+            if (it == null)
+                it = input.iterator();
+            if (!it.hasNext())
+                return false;
+            value = it.next();
+            return true;
+        }
+        
+        @Override
+        public Text getCurrentKey() {
+          return null;
+        }
+        
+        @Override
+        public Tuple getCurrentValue() {
+          return value;
+        }
+        
+        @Override
+        public void write(PigNullableWritable key, Writable value) 
+            throws IOException, InterruptedException {
+            output.add(new Pair<PigNullableWritable, Writable>(key, value));
+        }
+        
+        @Override
+        public void progress() {
+          
+        }
+    }
+}

Added: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+
+public class PigMapReduce extends PigGenericMapReduce {
+    public static class Reduce extends PigGenericMapReduce.Reduce {
+        /**
+         * Get reducer's illustrator context
+         * 
+         * @param input Input buffer as output by maps
+         * @param pkg package
+         * @return reducer's illustrator context
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        @Override
+        public Context getIllustratorContext(Job job,
+               List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
+            return new IllustratorContext(job, input, pkg);
+        }
+        
+        @SuppressWarnings("unchecked")
+        public class IllustratorContext extends Context {
+            private PigNullableWritable currentKey = null, nextKey = null;
+            private NullableTuple nextValue = null;
+            private List<NullableTuple> currentValues = null;
+            private Iterator<Pair<PigNullableWritable, Writable>> it;
+            private final ByteArrayOutputStream bos;
+            private final DataOutputStream dos;
+            private final RawComparator sortComparator, groupingComparator;
+            POPackage pack = null;
+
+            public IllustratorContext(Job job,
+                  List<Pair<PigNullableWritable, Writable>> input,
+                  POPackage pkg
+                  ) throws IOException, InterruptedException {
+                super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
+                    null, null, null, null, null, null, PigNullableWritable.class, NullableTuple.class);
+                bos = new ByteArrayOutputStream();
+                dos = new DataOutputStream(bos);
+                org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
+                sortComparator = nwJob.getSortComparator();
+                groupingComparator = nwJob.getGroupingComparator();
+                
+                Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
+                        @Override
+                        public int compare(Pair<PigNullableWritable, Writable> o1,
+                                           Pair<PigNullableWritable, Writable> o2) {
+                            try {
+                                o1.first.write(dos);
+                                int l1 = bos.size();
+                                o2.first.write(dos);
+                                int l2 = bos.size();
+                                byte[] bytes = bos.toByteArray();
+                                bos.reset();
+                                return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
+                            } catch (IOException e) {
+                                throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
+                            }
+                        }
+                    }
+                );
+                currentValues = new ArrayList<NullableTuple>();
+                it = input.iterator();
+                if (it.hasNext()) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    nextKey = entry.first;
+                    nextValue = (NullableTuple) entry.second;
+                }
+                pack = pkg;
+            }
+            
+            @Override
+            public PigNullableWritable getCurrentKey() {
+                return currentKey;
+            }
+            
+            @Override
+            public boolean nextKey() {
+                if (nextKey == null)
+                    return false;
+                currentKey = nextKey;
+                currentValues.clear();
+                currentValues.add(nextValue);
+                nextKey = null;
+                for(; it.hasNext(); ) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    /* Why can't raw comparison be used?
+                    byte[] bytes;
+                    int l1, l2;
+                    try {
+                        currentKey.write(dos);
+                        l1 = bos.size();
+                        entry.first.write(dos);
+                        l2 = bos.size();
+                        bytes = bos.toByteArray();
+                    } catch (IOException e) {
+                        throw new RuntimeException("nextKey exception : "+e.getMessage());
+                    }
+                    bos.reset();
+                    if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
+                    */
+                    if (groupingComparator.compare(currentKey, entry.first) == 0)
+                    {
+                        currentValues.add((NullableTuple)entry.second);
+                    } else {
+                        nextKey = entry.first;
+                        nextValue = (NullableTuple) entry.second;
+                        break;
+                    }
+                }
+                return true;
+            }
+            
+            @Override
+            public Iterable<NullableTuple> getValues() {
+                return currentValues;
+            }
+            
+            @Override
+            public void write(PigNullableWritable k, Writable t) {
+            }
+            
+            @Override
+            public void progress() { 
+            }
+        }
+    }
+}

Added: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (added)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * We need to make Pig work with both hadoop 20 and hadoop 23 (PIG-2125). However,
+ * there is API differences between hadoop 20 and 23. Here we use a shims layer to 
+ * hide these API differences. A dynamic shims layer is not possible due to some 
+ * static dependencies. We adopt a static shimes approach. For different hadoop version, 
+ * we need to recompile. 
+ * 
+ * This class wrapping all static method. PigMapReduce, PigMapBase, MiniCluster wrapping hadoop
+ * version dependant implementaton of PigGenericMapReduce, PigGenericMapBase and MiniGenericCluster.
+ **/
+public class HadoopShims {
+    static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
+        JobContext newContext = new JobContext(original.getConfiguration(), original.getJobID());
+        return newContext;
+    }
+    
+    static public TaskAttemptContext createTaskAttemptContext(Configuration conf, 
+                                TaskAttemptID taskId) {
+        TaskAttemptContext newContext = new TaskAttemptContext(conf,
+            taskId);
+        return newContext;
+    }
+    
+    static public JobContext createJobContext(Configuration conf, 
+            JobID jobId) {
+        JobContext newJobContext = new JobContext(
+                conf, jobId);
+        return newJobContext;
+    }
+
+    static public boolean isMap(TaskAttemptID taskAttemptID) {
+        return taskAttemptID.isMap();
+    }
+}

Added: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+
+abstract public class PigMapBase extends PigGenericMapBase {
+    /**
+     * 
+     * Get mapper's illustrator context
+     * 
+     * @param conf  Configuration
+     * @param input Input bag to serve as data source
+     * @param output Map output buffer
+     * @param split the split
+     * @return Illustrator's context
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public Context getIllustratorContext(Configuration conf, DataBag input,
+          List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+          throws IOException, InterruptedException {
+        return new IllustratorContext(conf, input, output, split);
+    }
+    
+    public class IllustratorContext extends Context {
+        private DataBag input;
+        List<Pair<PigNullableWritable, Writable>> output;
+        private Iterator<Tuple> it = null;
+        private Tuple value = null;
+        private boolean init  = false;
+
+        public IllustratorContext(Configuration conf, DataBag input,
+              List<Pair<PigNullableWritable, Writable>> output,
+              InputSplit split) throws IOException, InterruptedException {
+              if (output == null)
+                  throw new IOException("Null output can not be used");
+              this.input = input; this.output = output;
+        }
+        
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            if (input == null) {
+                if (!init) {
+                    init = true;
+                    return true;
+                }
+                return false;
+            }
+            if (it == null)
+                it = input.iterator();
+            if (!it.hasNext())
+                return false;
+            value = it.next();
+            return true;
+        }
+        
+        @Override
+        public Text getCurrentKey() {
+          return null;
+        }
+        
+        @Override
+        public Tuple getCurrentValue() {
+          return value;
+        }
+        
+        @Override
+        public void write(PigNullableWritable key, Writable value) 
+            throws IOException, InterruptedException {
+            output.add(new Pair<PigNullableWritable, Writable>(key, value));
+        }
+        
+        @Override
+        public void progress() {
+          
+        }
+
+        @Override
+        public InputSplit getInputSplit() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Counter getCounter(Enum<?> arg0) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Counter getCounter(String arg0, String arg1) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public OutputCommitter getOutputCommitter() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getStatus() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public TaskAttemptID getTaskAttemptID() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public void setStatus(String arg0) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public Path[] getArchiveClassPaths() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public long[] getArchiveTimestamps() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public URI[] getCacheArchives() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public URI[] getCacheFiles() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+                throws ClassNotFoundException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Configuration getConfiguration() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Credentials getCredentials() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Path[] getFileClassPaths() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public long[] getFileTimestamps() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public RawComparator<?> getGroupingComparator() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+                throws ClassNotFoundException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getJar() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public JobID getJobID() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getJobName() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public boolean getJobSetupCleanupNeeded() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        public Path[] getLocalCacheArchives() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Path[] getLocalCacheFiles() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<?> getMapOutputKeyClass() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<?> getMapOutputValueClass() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+                throws ClassNotFoundException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public int getMaxMapAttempts() {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        public int getMaxReduceAttempts() {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        public int getNumReduceTasks() {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+                throws ClassNotFoundException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<?> getOutputKeyClass() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<?> getOutputValueClass() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+                throws ClassNotFoundException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public boolean getProfileEnabled() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        public String getProfileParams() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public IntegerRanges getProfileTaskRange(boolean arg0) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+                throws ClassNotFoundException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public RawComparator<?> getSortComparator() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public boolean getSymlink() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        public String getUser() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Path getWorkingDirectory() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+    }
+}

Added: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+
+public class PigMapReduce extends PigGenericMapReduce {
+    public static class Reduce extends PigGenericMapReduce.Reduce {
+        /**
+         * Get reducer's illustrator context
+         * 
+         * @param input Input buffer as output by maps
+         * @param pkg package
+         * @return reducer's illustrator context
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        @Override
+        public Context getIllustratorContext(Job job,
+               List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
+            return new IllustratorContext(job, input, pkg);
+        }
+        
+        @SuppressWarnings("unchecked")
+        public class IllustratorContext extends Context {
+            private PigNullableWritable currentKey = null, nextKey = null;
+            private NullableTuple nextValue = null;
+            private List<NullableTuple> currentValues = null;
+            private Iterator<Pair<PigNullableWritable, Writable>> it;
+            private final ByteArrayOutputStream bos;
+            private final DataOutputStream dos;
+            private final RawComparator sortComparator, groupingComparator;
+            POPackage pack = null;
+
+            public IllustratorContext(Job job,
+                  List<Pair<PigNullableWritable, Writable>> input,
+                  POPackage pkg
+                  ) throws IOException, InterruptedException {
+                bos = new ByteArrayOutputStream();
+                dos = new DataOutputStream(bos);
+                org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
+                sortComparator = nwJob.getSortComparator();
+                groupingComparator = nwJob.getGroupingComparator();
+                
+                Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
+                        @Override
+                        public int compare(Pair<PigNullableWritable, Writable> o1,
+                                           Pair<PigNullableWritable, Writable> o2) {
+                            try {
+                                o1.first.write(dos);
+                                int l1 = bos.size();
+                                o2.first.write(dos);
+                                int l2 = bos.size();
+                                byte[] bytes = bos.toByteArray();
+                                bos.reset();
+                                return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
+                            } catch (IOException e) {
+                                throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
+                            }
+                        }
+                    }
+                );
+                currentValues = new ArrayList<NullableTuple>();
+                it = input.iterator();
+                if (it.hasNext()) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    nextKey = entry.first;
+                    nextValue = (NullableTuple) entry.second;
+                }
+                pack = pkg;
+            }
+            
+            @Override
+            public PigNullableWritable getCurrentKey() {
+                return currentKey;
+            }
+            
+            @Override
+            public boolean nextKey() {
+                if (nextKey == null)
+                    return false;
+                currentKey = nextKey;
+                currentValues.clear();
+                currentValues.add(nextValue);
+                nextKey = null;
+                for(; it.hasNext(); ) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    /* Why can't raw comparison be used?
+                    byte[] bytes;
+                    int l1, l2;
+                    try {
+                        currentKey.write(dos);
+                        l1 = bos.size();
+                        entry.first.write(dos);
+                        l2 = bos.size();
+                        bytes = bos.toByteArray();
+                    } catch (IOException e) {
+                        throw new RuntimeException("nextKey exception : "+e.getMessage());
+                    }
+                    bos.reset();
+                    if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
+                    */
+                    if (groupingComparator.compare(currentKey, entry.first) == 0)
+                    {
+                        currentValues.add((NullableTuple)entry.second);
+                    } else {
+                        nextKey = entry.first;
+                        nextValue = (NullableTuple) entry.second;
+                        break;
+                    }
+                }
+                return true;
+            }
+            
+            @Override
+            public Iterable<NullableTuple> getValues() {
+                return currentValues;
+            }
+            
+            @Override
+            public void write(PigNullableWritable k, Writable t) {
+            }
+            
+            @Override
+            public void progress() { 
+            }
+
+            @Override
+            public Counter getCounter(Enum<?> arg0) {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(String arg0, String arg1) {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public NullableTuple getCurrentValue() throws IOException,
+                    InterruptedException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public OutputCommitter getOutputCommitter() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public boolean nextKeyValue() throws IOException,
+                    InterruptedException {
+                // TODO Auto-generated method stub
+                return false;
+            }
+
+            @Override
+            public String getStatus() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public TaskAttemptID getTaskAttemptID() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public void setStatus(String arg0) {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public Path[] getArchiveClassPaths() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public long[] getArchiveTimestamps() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public URI[] getCacheArchives() throws IOException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public URI[] getCacheFiles() throws IOException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+                    throws ClassNotFoundException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Configuration getConfiguration() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Credentials getCredentials() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Path[] getFileClassPaths() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public long[] getFileTimestamps() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public RawComparator<?> getGroupingComparator() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+                    throws ClassNotFoundException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public String getJar() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public JobID getJobID() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public String getJobName() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public boolean getJobSetupCleanupNeeded() {
+                // TODO Auto-generated method stub
+                return false;
+            }
+
+            @Override
+            public Path[] getLocalCacheArchives() throws IOException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Path[] getLocalCacheFiles() throws IOException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<?> getMapOutputKeyClass() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<?> getMapOutputValueClass() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+                    throws ClassNotFoundException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public int getMaxMapAttempts() {
+                // TODO Auto-generated method stub
+                return 0;
+            }
+
+            @Override
+            public int getMaxReduceAttempts() {
+                // TODO Auto-generated method stub
+                return 0;
+            }
+
+            @Override
+            public int getNumReduceTasks() {
+                // TODO Auto-generated method stub
+                return 0;
+            }
+
+            @Override
+            public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+                    throws ClassNotFoundException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<?> getOutputKeyClass() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<?> getOutputValueClass() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+                    throws ClassNotFoundException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public boolean getProfileEnabled() {
+                // TODO Auto-generated method stub
+                return false;
+            }
+
+            @Override
+            public String getProfileParams() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public IntegerRanges getProfileTaskRange(boolean arg0) {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+                    throws ClassNotFoundException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public RawComparator<?> getSortComparator() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public boolean getSymlink() {
+                // TODO Auto-generated method stub
+                return false;
+            }
+
+            @Override
+            public String getUser() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public Path getWorkingDirectory() throws IOException {
+                // TODO Auto-generated method stub
+                return null;
+            }
+        }
+    }
+}

Added: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (added)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.ContextFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+public class HadoopShims {
+    static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
+        JobContext newContext = ContextFactory.cloneContext(original, original.getConfiguration());
+        return newContext;
+    }
+    
+    static public TaskAttemptContext createTaskAttemptContext(Configuration conf, 
+                                TaskAttemptID taskId) {
+        TaskAttemptContext newContext = new TaskAttemptContextImpl(conf, taskId);
+        return newContext;
+    }
+    
+    static public JobContext createJobContext(Configuration conf, 
+            JobID jobId) {
+        JobContext newContext = new JobContextImpl(conf, jobId);
+        return newContext;
+    }
+
+    static public boolean isMap(TaskAttemptID taskAttemptID) {
+        TaskType type = taskAttemptID.getTaskType();
+        if (type==TaskType.MAP)
+            return true;
+        
+        return false;
+    }
+}

Added: pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java (added)
+++ pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+public class MiniCluster extends MiniGenericCluster {
+    private MiniMRCluster m_mr = null;
+    public MiniCluster() {
+        super();
+    }
+
+    @Override
+    protected void setupMiniDfsAndMrClusters() {
+        try {
+            final int dataNodes = 4;     // There will be 4 data nodes
+            final int taskTrackers = 4;  // There will be 4 task tracker nodes
+            
+            // Create the configuration hadoop-site.xml file
+            File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/");
+            conf_dir.mkdirs();
+            File conf_file = new File(conf_dir, "hadoop-site.xml");
+            
+            conf_file.delete();
+            
+            // Builds and starts the mini dfs and mapreduce clusters
+            Configuration config = new Configuration();
+            m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+            m_fileSys = m_dfs.getFileSystem();
+            m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
+            
+            // Write the necessary config info to hadoop-site.xml
+            m_conf = m_mr.createJobConf();      
+            m_conf.setInt("mapred.submit.replication", 2);
+            m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+            m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+            m_conf.set("mapred.map.max.attempts", "2");
+            m_conf.set("mapred.reduce.max.attempts", "2");
+            m_conf.writeXml(new FileOutputStream(conf_file));
+            
+            // Set the system properties needed by Pig
+            System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
+            System.setProperty("namenode", m_conf.get("fs.default.name"));
+            System.setProperty("junit.hadoop.conf", conf_dir.getPath());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void shutdownMiniMrClusters() {
+        if (m_mr != null) { m_mr.shutdown(); }
+            m_mr = null;        
+    }
+}

Added: pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java (added)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.TestMRJobs;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+/**
+ * This class builds a single instance of itself with the Singleton 
+ * design pattern. While building the single instance, it sets up a 
+ * mini cluster that actually consists of a mini DFS cluster and a 
+ * mini MapReduce cluster on the local machine and also sets up the 
+ * environment for Pig to run on top of the mini cluster.
+ */
+public class MiniCluster extends MiniGenericCluster {
+    protected MiniMRYarnCluster m_mr = null;
+    private Configuration m_dfs_conf = null;
+    private Configuration m_mr_conf = null;
+    
+    public MiniCluster() {
+        super();
+    }
+    
+    @Override
+    protected void setupMiniDfsAndMrClusters() {
+		Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO);
+        try {
+            final int dataNodes = 4;     // There will be 4 data nodes
+            final int taskTrackers = 4;  // There will be 4 task tracker nodes
+            
+			Logger.getRootLogger().setLevel(Level.TRACE);
+            // Create the configuration hadoop-site.xml file
+            File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/");
+            conf_dir.mkdirs();
+            File conf_file = new File(conf_dir, "hadoop-site.xml");
+            
+            conf_file.delete();
+            
+            // Builds and starts the mini dfs and mapreduce clusters
+            Configuration config = new Configuration();
+            m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+            m_fileSys = m_dfs.getFileSystem();
+            m_dfs_conf = m_dfs.getConfiguration(0);
+            
+            m_mr = new MiniMRYarnCluster("PigMiniCluster");
+            m_mr.init(new Configuration());
+            //m_mr.init(m_dfs_conf);
+            m_mr.start();
+            
+            // Write the necessary config info to hadoop-site.xml
+			//m_mr_conf = m_mr.getConfig();
+            m_mr_conf = new Configuration(m_mr.getConfig());
+            
+            m_conf = m_mr_conf;
+			m_conf.set("fs.default.name", m_dfs_conf.get("fs.default.name"));
+
+			/*
+			try {
+				DistributedCache.addCacheFile(new URI("file:///hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), m_conf);
+				DistributedCache.addCacheFile(new URI("file:///hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), m_conf);
+				DistributedCache.addCacheFile(new URI("file:///pig.jar"), m_conf);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+			*/
+			m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), new Path("/hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"));
+			m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), new Path("/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"));
+			m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///pig.jar"), new Path("/pig.jar"));
+			m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///pig-test.jar"), new Path("/pig-test.jar"));
+
+            DistributedCache.addFileToClassPath(new Path("/hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), m_conf);
+            DistributedCache.addFileToClassPath(new Path("/pig.jar"), m_conf);
+            DistributedCache.addFileToClassPath(new Path("/pig-test.jar"), m_conf);
+            DistributedCache.addFileToClassPath(new Path("/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), m_conf);
+
+            //ConfigurationUtil.mergeConf(m_conf, m_dfs_conf);
+            //ConfigurationUtil.mergeConf(m_conf, m_mr_conf);
+            
+            m_conf.setInt("mapred.submit.replication", 2);
+            m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+            m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+            m_conf.set("mapred.map.max.attempts", "2");
+            m_conf.set("mapred.reduce.max.attempts", "2");
+            m_conf.writeXml(new FileOutputStream(conf_file));
+            
+//            try {
+//                Thread.sleep(1000*1000);
+//            } catch (InterruptedException e) {
+//                // TODO Auto-generated catch block
+//                e.printStackTrace();
+//            }
+            
+			System.err.println("XXX: Setting fs.default.name to: " + m_dfs_conf.get("fs.default.name"));
+            // Set the system properties needed by Pig
+            System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
+            //System.setProperty("namenode", m_dfs_conf.get("fs.default.name"));
+            System.setProperty("namenode", m_conf.get("fs.default.name"));
+            System.setProperty("junit.hadoop.conf", conf_dir.getPath());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @Override
+    protected void shutdownMiniMrClusters() {
+        if (m_mr != null) { m_mr.stop(); }     
+        m_mr = null;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Tue Jul 19 01:01:53 2011
@@ -25,6 +25,8 @@ import java.util.Properties;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 public class ConfigurationUtil {
@@ -65,8 +67,13 @@ public class ConfigurationUtil {
     }
     
     public static Properties getLocalFSProperties() {
-        Configuration localConf = new Configuration(false);
-        localConf.addResource("core-default.xml");
+        Configuration localConf;
+        if (PigMapReduce.sJobContext!=null && PigMapReduce.sJobContext.getConfiguration().get("exectype").equals(ExecType.LOCAL.toString())) {
+            localConf = new Configuration(false);
+            localConf.addResource("core-default.xml");
+        } else {
+            localConf = new Configuration(true);
+        }
         Properties props = ConfigurationUtil.toProperties(localConf);
         props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
         return props;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Jul 19 01:01:53 2011
@@ -64,10 +64,11 @@ import org.apache.pig.pen.POOptimizeDisa
 public class HExecutionEngine {
     
     public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
-    private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+    private static final String FILE_SYSTEM_LOCATION = "fs.defaultFS";
     
     private static final String HADOOP_SITE = "hadoop-site.xml";
     private static final String CORE_SITE = "core-site.xml";
+    private static final String YARN_SITE = "yarn-site.xml";
     private final Log log = LogFactory.getLog(getClass());
     public static final String LOCAL = "local";
     
@@ -155,6 +156,7 @@ public class HExecutionEngine {
 
             jc = new JobConf();
             jc.addResource("pig-cluster-hadoop-site.xml");
+            jc.addResource(YARN_SITE);
             
             // Trick to invoke static initializer of DistributedFileSystem to add hdfs-default.xml 
             // into configuration

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Jul 19 01:01:53 2011
@@ -84,6 +84,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -1354,8 +1355,7 @@ public class MRCompiler extends PhyPlanV
                             Job job = new Job(conf);
                             loader.setLocation(location, job);
                             InputFormat inf = loader.getInputFormat();
-                            List<InputSplit> splits = inf.getSplits(new JobContext(
-                                    job.getConfiguration(), job.getJobID()));
+                            List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job));
                             List<List<InputSplit>> results = MapRedUtil
                             .getCombinePigSplits(splits, fs
                                     .getDefaultBlockSize(), conf);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Jul 19 01:01:53 2011
@@ -55,6 +55,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -563,7 +565,7 @@ public class MapReduceLauncher extends L
      * @throws IOException 
      */
     private void storeSchema(Job job, POStore st) throws IOException {
-        JobContext jc = new JobContext(job.getJobConf(), 
+        JobContext jc = HadoopShims.createJobContext(job.getJobConf(), 
                 new org.apache.hadoop.mapreduce.JobID());
         JobContext updatedJc = PigOutputCommitter.setUpContext(jc, st);
         PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration());

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Tue Jul 19 01:01:53 2011
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.TaskI
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 /**
@@ -58,7 +60,8 @@ public class MapReducePOStoreImpl extend
         // task (map or reduce) we could have multiple stores, we should
         // make this copy so that the same context does not get over-written
         // by the different stores.
-        this.context = new TaskAttemptContext(outputConf, 
+        
+        this.context = HadoopShims.createTaskAttemptContext(outputConf, 
                 context.getTaskAttemptID());
     }
     

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+/**
+ * This class is the base class for PigMapBase, which has slightly
+ * difference among different versions of hadoop. PigMapBase implementation
+ * is located in $PIG_HOME/shims.
+**/
+public abstract class PigGenericMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
+    private static final Tuple DUMMYTUPLE = null;
+
+    private final Log log = LogFactory.getLog(getClass());
+    
+    protected byte keyType;
+        
+    //Map Plan
+    protected PhysicalPlan mp = null;
+
+    // Store operators
+    protected List<POStore> stores;
+
+    protected TupleFactory tf = TupleFactory.getInstance();
+    
+    boolean inIllustrator = false;
+    
+    Context outputCollector;
+    
+    // Reporter that will be used by operators
+    // to transmit heartbeat
+    ProgressableReporter pigReporter;
+
+    protected boolean errorInMap = false;
+    
+    PhysicalOperator[] roots;
+
+    private PhysicalOperator leaf;
+
+    PigContext pigContext = null;
+    private volatile boolean initialized = false;
+    
+    /**
+     * for local map/reduce simulation
+     * @param plan the map plan
+     */
+    public void setMapPlan(PhysicalPlan plan) {
+        mp = plan;
+    }
+    
+    /**
+     * Will be called when all the tuples in the input
+     * are done. So reporter thread should be closed.
+     */
+    @Override
+    public void cleanup(Context context) throws IOException, InterruptedException {
+        super.cleanup(context);
+        if(errorInMap) {
+            //error in map - returning
+            return;
+        }
+            
+        if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
+            // If there is a stream in the pipeline or if this map job belongs to merge-join we could 
+            // potentially have more to process - so lets
+            // set the flag stating that all map input has been sent
+            // already and then lets run the pipeline one more time
+            // This will result in nothing happening in the case
+            // where there is no stream or it is not a merge-join in the pipeline
+            mp.endOfAllInput = true;
+            runPipeline(leaf);
+        }
+
+        for (POStore store: stores) {
+            if (!initialized) {
+                MapReducePOStoreImpl impl 
+                    = new MapReducePOStoreImpl(context);
+                store.setStoreImpl(impl);
+                store.setUp();
+            }
+            store.tearDown();
+        }
+        
+        //Calling EvalFunc.finish()
+        UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(mp));
+        try {
+            finisher.visit();
+        } catch (VisitorException e) {
+            int errCode = 2121;
+            String msg = "Error while calling finish method on UDFs.";
+            throw new VisitorException(msg, errCode, PigException.BUG, e);
+        }
+        
+        mp = null;
+
+        PhysicalOperator.setReporter(null);
+        initialized = false;
+    }
+
+    /**
+     * Configures the mapper with the map plan and the
+     * reproter thread
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setup(Context context) throws IOException, InterruptedException {       	
+        super.setup(context);
+        
+        Configuration job = context.getConfiguration();
+        SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+        PigMapReduce.sJobContext = context;
+        PigMapReduce.sJobConfInternal.set(context.getConfiguration());
+        PigMapReduce.sJobConf = context.getConfiguration();
+        inIllustrator = (context instanceof PigMapBase.IllustratorContext);
+        
+        PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
+        pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+        if (pigContext.getLog4jProperties()!=null)
+            PropertyConfigurator.configure(pigContext.getLog4jProperties());
+        
+        if (mp == null)
+            mp = (PhysicalPlan) ObjectSerializer.deserialize(
+                job.get("pig.mapPlan"));
+        stores = PlanHelper.getStores(mp);
+        
+        // To be removed
+        if(mp.isEmpty())
+            log.debug("Map Plan empty!");
+        else{
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            mp.explain(baos);
+            log.debug(baos.toString());
+        }
+        keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
+        // till here
+        
+        pigReporter = new ProgressableReporter();
+        // Get the UDF specific context
+        MapRedUtil.setupUDFContext(job);
+
+        if(!(mp.isEmpty())) {
+
+            PigSplit split = (PigSplit)context.getInputSplit();
+            List<OperatorKey> targetOpKeys = split.getTargetOps();
+            
+            ArrayList<PhysicalOperator> targetOpsAsList = new ArrayList<PhysicalOperator>();
+            for (OperatorKey targetKey : targetOpKeys) {                    
+                targetOpsAsList.add(mp.getOperator(targetKey));
+            }
+            roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
+            leaf = mp.getLeaves().get(0);               
+        }
+        
+        PigStatusReporter.setContext(context);
+ 
+    }
+    
+    /**
+     * The map function that attaches the inpTuple appropriately
+     * and executes the map plan if its not empty. Collects the
+     * result of execution into oc or the input directly to oc
+     * if map plan empty. The collection is left abstract for the
+     * map-only or map-reduce job to implement. Map-only collects
+     * the tuple as-is whereas map-reduce collects it after extracting
+     * the key and indexed tuple.
+     */   
+    @Override
+    protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {     
+        if(!initialized) {
+            initialized  = true;
+            // cache the collector for use in runPipeline() which
+            // can be called from close()
+            this.outputCollector = context;
+            pigReporter.setRep(context);
+            PhysicalOperator.setReporter(pigReporter);
+           
+            for (POStore store: stores) {
+                MapReducePOStoreImpl impl 
+                    = new MapReducePOStoreImpl(context);
+                store.setStoreImpl(impl);
+                if (!pigContext.inIllustrator)
+                    store.setUp();
+            }
+            
+            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+            pigHadoopLogger.setAggregate(aggregateWarning);           
+            pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+
+            PhysicalOperator.setPigLogger(pigHadoopLogger);
+        }
+        
+        if (mp.isEmpty()) {
+            collect(context,inpTuple);
+            return;
+        }
+        
+        for (PhysicalOperator root : roots) {
+            if (inIllustrator) {
+                if (root != null) {
+                    root.attachInput(inpTuple);
+                }
+            } else {
+                root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
+            }
+        }
+            
+        runPipeline(leaf);
+    }
+
+    protected void runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException {
+        while(true){
+            Result res = leaf.getNext(DUMMYTUPLE);
+            if(res.returnStatus==POStatus.STATUS_OK){
+                collect(outputCollector,(Tuple)res.result);
+                continue;
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_EOP) {
+                return;
+            }
+            
+            if(res.returnStatus==POStatus.STATUS_NULL)
+                continue;
+            
+            if(res.returnStatus==POStatus.STATUS_ERR){
+                // remember that we had an issue so that in 
+                // close() we can do the right thing
+                errorInMap  = true;
+                // if there is an errmessage use it
+                String errMsg;
+                if(res.result != null) {
+                    errMsg = "Received Error while " +
+                    "processing the map plan: " + res.result;
+                } else {
+                    errMsg = "Received Error while " +
+                    "processing the map plan.";
+                }
+                    
+                int errCode = 2055;
+                ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
+                throw ee;
+            }
+        }
+        
+    }
+
+    abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException;
+
+    /**
+     * @return the keyType
+     */
+    public byte getKeyType() {
+        return keyType;
+    }
+
+    /**
+     * @param keyType the keyType to set
+     */
+    public void setKeyType(byte keyType) {
+        this.keyType = keyType;
+    }
+    
+    abstract public Context getIllustratorContext(Configuration conf, DataBag input,
+            List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+            throws IOException, InterruptedException;
+}