You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/08/30 22:04:31 UTC

svn commit: r1519062 [1/5] - in /pig/trunk: ./ src/META-INF/services/ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ s...

Author: cheolsoo
Date: Fri Aug 30 20:04:29 2013
New Revision: 1519062

URL: http://svn.apache.org/r1519062
Log:
PIG-3419: Pluggable Execution Engine (achalsoni81 via cheolsoo)

Added:
    pig/trunk/src/META-INF/services/org.apache.pig.ExecType
    pig/trunk/src/org/apache/pig/ExecTypeProvider.java
    pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
    pig/trunk/test/org/apache/pig/test/TestMRExecutionEngine.java
    pig/trunk/test/org/apache/pig/test/TestMRJobStats.java
Removed:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java
    pig/trunk/test/org/apache/pig/test/TestHExecutionEngine.java
    pig/trunk/test/org/apache/pig/test/TestJobStats.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/src/org/apache/pig/ExecType.java
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
    pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
    pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/trunk/test/org/apache/pig/test/TestCounters.java
    pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
    pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
    pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java
    pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
    pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java
    pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    pig/trunk/test/org/apache/pig/test/TestNumberOfReducers.java
    pig/trunk/test/org/apache/pig/test/TestParser.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    pig/trunk/test/org/apache/pig/test/TestPigSplit.java
    pig/trunk/test/org/apache/pig/test/TestPigStats.java
    pig/trunk/test/org/apache/pig/test/TestProjectRange.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Aug 30 20:04:29 2013
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
 
 INCOMPATIBLE CHANGES
 
+PIG-3419: Pluggable Execution Engine (achalsoni81 via cheolsoo)
+
 PIG-3191: [piggybank] MultiStorage output filenames are not sortable (Danny Antonelli via jcoveney)
 
 PIG-3174: Remove rpm and deb artifacts from build.xml (gates)

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Fri Aug 30 20:04:29 2013
@@ -527,6 +527,9 @@
         <echo>*** Else, you will only be warned about deprecations ***</echo>
         <compileSources sources="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2;${src.shims.dir}"
             excludes="" dist="${build.classes}" cp="classpath" warnings="${javac.args.warnings}" />
+        <copy todir="${build.classes}/META-INF">
+            <fileset dir="${src.dir}/META-INF" includes="**"/>
+        </copy>
     </target>
 
     <target name="compile-test" depends="compile, ivy-test">
@@ -536,7 +539,6 @@
         <compileSources sources="${test.src.dir};${src.shims.test.dir}"
             excludes="**/PigTestLoader.java **/resources/** perf/**" dist="${test.build.classes}" cp="test.classpath" warnings="${javac.args.warnings}" />
         <copy file="${basedir}/test/hbase-site.xml" tofile="${test.build.classes}/hbase-site.xml"/>
-
         <ivy:cachepath pathid="mr-apps-test-ivy.classpath" conf="test" />
         <path id="mr-apps-test.classpath">
             <fileset dir="${clover.home}" erroronmissingdir="false">

Added: pig/trunk/src/META-INF/services/org.apache.pig.ExecType
URL: http://svn.apache.org/viewvc/pig/trunk/src/META-INF/services/org.apache.pig.ExecType?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/META-INF/services/org.apache.pig.ExecType (added)
+++ pig/trunk/src/META-INF/services/org.apache.pig.ExecType Fri Aug 30 20:04:29 2013
@@ -0,0 +1,16 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalExecType
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecType
+

Modified: pig/trunk/src/org/apache/pig/ExecType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ExecType.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/ExecType.java (original)
+++ pig/trunk/src/org/apache/pig/ExecType.java Fri Aug 30 20:04:29 2013
@@ -19,36 +19,70 @@
 package org.apache.pig;
 
 import java.io.Serializable;
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalExecType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecType;
+import org.apache.pig.impl.PigContext;
 
 /**
- * The type of query execution
+ * The type of query execution. Pig will cycle through all implementations
+ * of ExecType and choose the first one that matches the Properties passed in.
+ * This ExecType then dictates the ExecutionEngine used for processing and 
+ * other behaviour throughout Pig. Any implementing classes should be noted in
+ * the META-INF/services folder titled org.apache.pig.ExecType as per the 
+ * Java ServiceLoader specification.
  */
-public enum ExecType implements Serializable {
+public interface ExecType extends Serializable {
+
+    public static final ExecType LOCAL = new LocalExecType();
+    public static final ExecType MAPREDUCE = new MRExecType();
+
+    /**
+     * An ExecType is selected based off the Properties for the given script.
+     * There may be multiple settings that trigger the selection of a given
+     * ExecType. For example, distributed MR mode is currently triggered if the
+     * user specifies "mapred" or "mapreduce". It is desirable to override the
+     * toString method of the given ExecType to uniquely identify the ExecType.
+     * 
+     * The initialize method should return true if it accepts the properties or
+     * false if it does not. The Java ServiceLoader framework will be used to
+     * iterate through and select the correct ExecType.
+     */
+
+    public boolean accepts(Properties properties);
+
     /**
-     * Run everything on the local machine
+     * Returns the Execution Engine that this ExecType is associated with. Once
+     * the ExecType the script is running in is determined by the PigServer, it
+     * will then call this method to get an instance of the ExecutionEngine
+     * associated with this ExecType to delegate all further execution to on the
+     * backend.
      */
-    LOCAL,
+    public ExecutionEngine getExecutionEngine(PigContext pigContext);
+
     /**
-     * Use the Hadoop Map/Reduce framework
+     * Returns the Execution Engine class that this ExecType is associated with.
+     * This method simply returns the class of the ExecutionEngine associated
+     * with this ExecType, and not an instance of it.
      */
-    MAPREDUCE;
+    public Class<? extends ExecutionEngine> getExecutionEngineClass();
 
     /**
-     * Given a string, determine the exec type.
-     * @param execString accepted values are 'local', 'mapreduce', and 'mapred'
-     * @return exectype as ExecType
+     * An ExecType is classified as local if it runs in-process and through the
+     * local filesystem. While an ExecutionEngine may have a more nuanced notion
+     * of local mode, these are the qualifications Pig requires.
+     * ExecutionEngines can extend the ExecType interface to additionally
+     * differentiate between ExecTypes as necessary.
      */
-    public static ExecType fromString(String execString) throws PigException {
-        if (execString.equals("mapred")) {
-            return MAPREDUCE;
-        } else {
-            try {
-                return ExecType.valueOf(execString.toUpperCase());
-            } catch (IllegalArgumentException e) {
-                int errCode = 2040;
-                String msg = "Unknown exec type: " + execString;
-                throw new PigException(msg, errCode, e);
-            }
-        }
-    }
+    public boolean isLocal();
+    
+    
+    /** 
+     * Returns the canonical name for this ExecType.
+     * @return
+     */
+    public String name();
+
 }

Added: pig/trunk/src/org/apache/pig/ExecTypeProvider.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ExecTypeProvider.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/ExecTypeProvider.java (added)
+++ pig/trunk/src/org/apache/pig/ExecTypeProvider.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalExecType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecType;
+import org.apache.pig.impl.util.PropertiesUtil;
+
+public class ExecTypeProvider {
+
+    private static final Log log = LogFactory.getLog(ExecTypeProvider.class);
+
+    public static ExecType selectExecType(Properties properties)
+            throws PigException {
+
+        ServiceLoader<ExecType> frameworkLoader = ServiceLoader
+                .load(ExecType.class);
+
+        for (ExecType execType : frameworkLoader) {
+            log.info("Trying ExecType : " + execType);
+            if (execType.accepts(properties)) {
+                log.debug("Picked " + execType + " as the ExecType");
+                return getSingleton(execType);
+            } else {
+                log.debug("Cannot pick " + execType + " as the ExecType");
+            }
+        }
+        throw new PigException("Unknown exec type: "
+                + properties.getProperty("exectype"), 2040);
+    }
+
+    /**
+     * This method attempts to return a singleton instance of the given exec
+     * type. Only works for MR ExecTypes as these are the only ExecTypes that we
+     * have constants in the Pig codebase for.
+     * 
+     * @param execType
+     * @return
+     */
+    private static ExecType getSingleton(ExecType execType) {
+        if (execType instanceof MRExecType) {
+            return ExecType.MAPREDUCE;
+        }
+        if (execType instanceof LocalExecType) {
+            return ExecType.LOCAL;
+        }
+        // if it is not MR specific but rather a different
+        // execution engine, we don't have access to any
+        // constants that can act as singletons, so we just
+        // use the given instance
+        return execType;
+    }
+
+    public static ExecType fromString(String execType) throws PigException {
+        Properties properties = PropertiesUtil.loadDefaultProperties();
+        properties.setProperty("exectype", execType);
+        return selectExecType(properties);
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Fri Aug 30 20:04:29 2013
@@ -211,12 +211,7 @@ static int run(String args[], PigProgres
 
         ExecMode mode = ExecMode.UNKNOWN;
         String file = null;
-        String engine = null;
-        ExecType execType = ExecType.MAPREDUCE ;
-        String execTypeString = properties.getProperty("exectype");
-        if(execTypeString!=null && execTypeString.length()>0){
-            execType = ExecType.fromString(execTypeString);
-        }
+        String engine = null;        
 
         // set up client side system properties in UDF context
         UDFContext.getUDFContext().setClientSystemProps(properties);
@@ -328,13 +323,9 @@ static int run(String args[], PigProgres
                 break;
 
             case 'x':
-                try {
-                    execType = ExecType.fromString(opts.getValStr());
-                    } catch (IOException e) {
-                        throw new RuntimeException("ERROR: Unrecognized exectype.", e);
-                    }
+                properties.setProperty("exectype", opts.getValStr());
                 break;
-
+                
             case 'P':
             {
                 InputStream inputStream = null;
@@ -361,8 +352,9 @@ static int run(String args[], PigProgres
                      }
             }
         }
+        
         // create the context with the parameter
-        PigContext pigContext = new PigContext(execType, properties);
+        PigContext pigContext = new PigContext(properties);
 
         // create the static script state object
         String commandLine = LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
@@ -460,7 +452,6 @@ static int run(String args[], PigProgres
             pigContext.getProperties().setProperty(PigContext.JOB_NAME,
                                                    "PigLatin:" +new File(file).getName()
             );
-
             if (!debug) {
                 new File(substFile).deleteOnExit();
             }

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Aug 30 20:04:29 2013
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -54,10 +53,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.backend.hadoop.executionengine.HJob;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -125,7 +120,7 @@ public class PigServer {
     public static final String PRETTY_PRINT_SCHEMA_PROPERTY = "pig.pretty.print.schema";
     private static final String PIG_LOCATION_CHECK_STRICT = "pig.location.check.strict";
 
-    /*
+    /*    
      * The data structure to support grunt shell operations.
      * The grunt shell can only work on one graph at a time.
      * If a script is contained inside another script, the grunt
@@ -174,12 +169,23 @@ public class PigServer {
     /**
      * @param execTypeString can be 'mapreduce' or 'local'.  Local mode will
      * use Hadoop's local job runner to execute the job on the local machine.
-     * Mapreduce mode will connect to a cluster to execute the job.
-     * @throws ExecException
+     * Mapreduce mode will connect to a cluster to execute the job. If 
+     * execTypeString is not one of these two, Pig will deduce the ExecutionEngine
+     * if it is on the classpath and use it for the backend execution.
+     * @throws ExecException 
      * @throws IOException
      */
     public PigServer(String execTypeString) throws ExecException, IOException {
-        this(ExecType.fromString(execTypeString));
+        this(addExecTypeProperty(PropertiesUtil.loadDefaultProperties(), execTypeString));
+    }
+
+    public PigServer(Properties properties) throws ExecException, IOException {
+        this(new PigContext(properties));
+    }
+
+    private static Properties addExecTypeProperty(Properties properties, String execType) {
+        properties.setProperty("exectype", execType);
+        return properties;
     }
 
     /**
@@ -992,7 +998,7 @@ public class PigServer {
      */
     public void explain(String alias,
                         PrintStream stream) throws IOException {
-        explain(alias, "text", true, false, stream, stream, stream);
+        explain(alias, "text", true, false, stream, stream, null, null);
     }
 
     /**
@@ -1006,8 +1012,9 @@ public class PigServer {
      * call to execute in the respoect that all the pending stores are
      * marked as complete.
      * @param lps Stream to print the logical tree
-     * @param pps Stream to print the physical tree
-     * @param eps Stream to print the execution tree
+     * @param eps Stream to print the ExecutionEngine trees. If null, then will print to files
+     * @param dir Directory to print ExecutionEngine trees. If null, will use eps
+     * @param suffix Suffix of file names 
      * @throws IOException if the requested alias cannot be found.
      */
     @SuppressWarnings("unchecked")
@@ -1016,25 +1023,20 @@ public class PigServer {
                         boolean verbose,
                         boolean markAsExecute,
                         PrintStream lps,
-                        PrintStream pps,
-                        PrintStream eps) throws IOException {
+                        PrintStream eps,
+                        File dir,
+                        String suffix) throws IOException {
         try {
             pigContext.inExplain = true;
             buildStorePlan( alias );
             if( currDAG.lp.size() == 0 ) {
                 lps.println("Logical plan is empty.");
-                pps.println("Physical plan is empty.");
-                eps.println("Execution plan is empty.");
                 return;
+            } else {
+                currDAG.lp.explain(lps, format, verbose);
             }
-            PhysicalPlan pp = compilePp();
-            currDAG.lp.explain(lps, format, verbose);
-
-            pp.explain(pps, format, verbose);
 
-            MapRedUtil.checkLeafIsStore(pp, pigContext);
-            MapReduceLauncher launcher = new MapReduceLauncher();
-            launcher.explain(pp, pigContext, eps, format, verbose);
+            pigContext.getExecutionEngine().explain(currDAG.lp, pigContext, eps, format, verbose, dir, suffix );
 
             if (markAsExecute) {
                 currDAG.markAsExecuted();
@@ -1058,7 +1060,7 @@ public class PigServer {
      * @throws IOException
      */
     public long capacity() throws IOException {
-        if (pigContext.getExecType() == ExecType.LOCAL) {
+        if (pigContext.getExecType().isLocal()) {
             throw new IOException("capacity only supported for non-local execution");
         }
         else {
@@ -1290,9 +1292,8 @@ public class PigServer {
     private PigStats executeCompiledLogicalPlan() throws ExecException, FrontendException {
         // discover pig features used in this script
         ScriptState.get().setScriptFeatures( currDAG.lp );
-        PhysicalPlan pp = compilePp();
 
-        return launchPlan(pp, "job_pigexec_");
+        return launchPlan(currDAG.lp, "job_pigexec_");
     }
 
     /**
@@ -1303,52 +1304,23 @@ public class PigServer {
      * @throws ExecException
      * @throws FrontendException
      */
-    protected PigStats launchPlan(PhysicalPlan pp, String jobName) throws ExecException, FrontendException {
-        MapReduceLauncher launcher = new MapReduceLauncher();
+    protected PigStats launchPlan(LogicalPlan lp, String jobName) throws ExecException, FrontendException {
+
         PigStats stats = null;
         try {
-            stats = launcher.launchPig(pp, jobName, pigContext);
+            stats = pigContext.getExecutionEngine().launchPig(lp, jobName, pigContext);
+        } catch (ExecException e) {
+            throw (ExecException) e;
+        } catch (FrontendException e) {
+            throw (FrontendException) e;
         } catch (Exception e) {
             // There are a lot of exceptions thrown by the launcher.  If this
             // is an ExecException, just let it through.  Else wrap it.
-            if (e instanceof ExecException){
-                throw (ExecException)e;
-            } else if (e instanceof FrontendException) {
-                throw (FrontendException)e;
-            } else {
-                int errCode = 2043;
-                String msg = "Unexpected error during execution.";
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        } finally {
-            launcher.reset();
+            int errCode = 2043;
+            String msg = "Unexpected error during execution.";
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
 
-        for (OutputStats output : stats.getOutputStats()) {
-            if (!output.isSuccessful()) {
-                POStore store = output.getPOStore();
-                try {
-                    store.getStoreFunc().cleanupOnFailure(
-                            store.getSFile().getFileName(),
-                            new Job(output.getConf()));
-                } catch (IOException e) {
-                    throw new ExecException(e);
-                }
-            } else {
-                POStore store = output.getPOStore();
-                try {
-                    store.getStoreFunc().cleanupOnSuccess(
-                            store.getSFile().getFileName(),
-                            new Job(output.getConf()));
-                } catch (IOException e) {
-                    throw new ExecException(e);
-                } catch (AbstractMethodError nsme) {
-                    // Just swallow it.  This means we're running against an
-                    // older instance of a StoreFunc that doesn't implement
-                    // this method.
-                }
-            }
-        }
         return stats;
     }
 
@@ -1363,11 +1335,6 @@ public class PigServer {
         return currDAG.lp;
     }
 
-    private PhysicalPlan compilePp() throws FrontendException {
-        // translate lp to physical plan
-        return pigContext.getExecutionEngine().compile( currDAG.lp, null );
-    }
-
     private LogicalRelationalOperator getOperatorForAlias(String alias) throws IOException {
         buildStorePlan (alias);
         LogicalRelationalOperator op = (LogicalRelationalOperator)currDAG.getOperator( alias );

Added: pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (added)
+++ pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.executionengine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Properties;
+
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+/**
+ * 
+ * The main interface bridging the front end and back end of Pig. This allows Pig
+ * to be ran on multiple Execution Engines, and not being limited to only Hadoop
+ * MapReduce. The ExecutionEngines must support the following methods as these
+ * are all the access points for the Pig frontend for processing. Traditionally
+ * there is one ExecutionEngine created per processing job, but this is not necessary.
+ * The ExecutionEngine instance comes from the ExecType, and it can choose to reuse
+ * ExecutionEngine instances. All specifications for methods are listed below
+ * as well as expected behavior, and the ExecutionEngine must conform to these.
+ * 
+ */
+
+
+public interface ExecutionEngine {
+
+    /**
+     * This method is responsible for the initialization of the ExecutionEngine.
+     * All necessary setup tasks and configuration should execute in the init()
+     * method. This method will be called via the PigContext object.
+     */
+    public void init() throws ExecException;
+
+    /**
+     * Responsible for updating the properties for the ExecutionEngine. The
+     * update may require reinitialization of the engine, perhaps through
+     * another call to init() if appropriate. This decision is delegated to the
+     * ExecutionEngine -- that is, the caller will not call init() after
+     * updating the properties.
+     * 
+     * The Properties passed in should replace any configuration that occurred
+     * from previous Properties object. The Properties object should also be
+     * updated to reflect the deprecation/modifications that the ExecutionEngine
+     * may trigger.
+     * 
+     * @param newConfiguration -- Properties object holding all configuration vals
+     */
+    public void setConfiguration(Properties newConfiguration)
+            throws ExecException;
+
+    /**
+     * Responsible for setting a specific property and value. This method may be
+     * called as a result of a user "SET" command in the script or elsewhere in
+     * Pig to set certain properties.
+     * 
+     * The properties object of the PigContext should be updated with the
+     * property and value with deprecation/other configuration done by the
+     * ExecutionEngine reflected. The ExecutionEngine should also update its
+     * internal configuration view as well.
+     * 
+     * @param property to update
+     * @param value to set for property
+     */
+    public void setProperty(String property, String value);
+
+    /**
+     * Returns the Properties representation of the ExecutionEngine
+     * configuration. The Properties object returned does not have to be the
+     * same object between distinct calls to getConfiguration(). The ExecutionEngine 
+     * may create a new Properties object populated with all the properties
+     * each time.
+     */
+    public Properties getConfiguration();
+
+    /**
+     * This method is responsible for the actual execution of a LogicalPlan. No
+     * assumptions is made about the architecture of the ExecutionEngine, except
+     * that it is capable of executing the LogicalPlan representation of a
+     * script. The ExecutionEngine should take care of all cleanup after
+     * executing the logical plan and returns an implementation of PigStats that
+     * contains the relevant information/statistics of the execution of the
+     * script.
+     * 
+     * @param lp -- plan to compile
+     * @param grpName -- group name for submission
+     * @param pc -- context for execution
+     * @throws ExecException 
+     */
+    public PigStats launchPig(LogicalPlan lp, String grpName, PigContext pc)
+            throws FrontendException, ExecException;
+
+    /**
+     * This method handles the backend processing of the Explain command. Once
+     * again, no assumptions is made about the architecture of the
+     * ExecutionEngine, except that it is capable of "explaining" the
+     * LogicalPlan representation of a script. The ExecutionEngine should print
+     * all of it's explain statements in the PrintStream provided UNLESS the
+     * File object is NOT null. In that case, the file is the directory for
+     * which the ExecutionEngine must write out the explain statements into
+     * semantically distinct files. For example, if the ExecutionEngine operates
+     * on a PhysicalPlan and an ExecutionPlan then there would be two separate
+     * files detailing each. The suffix param indicates the suffix of each of
+     * these file names.
+     * 
+     * @param lp -- plan to explain
+     * @param pc -- context for explain processing
+     * @param ps -- print stream to write all output to (if dir param is null)
+     * @param format -- format to print explain 
+     * @param verbose 
+     * @param dir -- directory to write output to. if not null, write to files
+     * @param suffix -- if writing to files, suffix to be used for each file
+     * 
+     * 
+     * @throws PlanException
+     * @throws VisitorException
+     * @throws IOException
+     */
+    public void explain(LogicalPlan lp, PigContext pc, PrintStream ps,
+            String format, boolean verbose, File dir, String suffix)
+            throws PlanException, VisitorException, IOException;
+
+    /**
+     * Returns the DataStorage the ExecutionEngine is using.
+     * 
+     * @return DataStorage the ExecutionEngine is using.
+     */
+    public DataStorage getDataStorage();
+
+    /**
+     * Creates a ScriptState object which will be accessible as a ThreadLocal
+     * variable inside the ScriptState class. This method is called when first
+     * initializing the ScriptState as to delegate to the ExecutionEngine the
+     * version of ScriptState to use to manage the execution at hand.
+     * 
+     * @return ScriptState object to manage execution of the script
+     */
+    public ScriptState instantiateScriptState();
+
+    /**
+     * Returns the ExecutableManager to be used in Pig Streaming.
+     * 
+     * @return ExecutableManager to be used in Pig Streaming.
+     */
+    public ExecutableManager getExecutableManager();
+
+    /**
+     * This method is called whenever a kill signal has been triggered in the
+     * current process, indicating that the user wishes to kill and exit the
+     * current processing.. It is necessary to implement this method to ensure
+     * that we kill running jobs, and any other necessary cleanup.
+     */
+    public void kill() throws BackendException;
+
+    /**
+     * This method is called when a user requests to kill a job associated with
+     * the given job id. If it is not possible for a user to kill a job, throw a
+     * exception. It is imperative for the job id's being displayed to be unique
+     * such that the correct jobs are being killed when the user supplies the
+     * id.
+     * 
+     * @throws BackendException
+     */
+    public void killJob(String jobID) throws BackendException;
+
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.pigstats.PigStats;
+
+
+/**
+ * 
+ * Provides core processing implementation for the backend of Pig 
+ * if ExecutionEngine chosen decides to delegate it's work to this class.
+ * Also contains set of utility methods, including ones centered around 
+ * Hadoop.
+ * 
+ */
+public abstract class Launcher {
+    private static final Log log = LogFactory.getLog(Launcher.class);
+
+    long totalHadoopTimeSpent;
+    String newLine = "\n";
+    boolean pigException = false;
+    boolean outOfMemory = false;
+    static final String OOM_ERR = "OutOfMemoryError";
+
+    protected Launcher() {
+        totalHadoopTimeSpent = 0;
+        // handle the windows portion of \r
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+            newLine = "\r\n";
+        }
+        reset();
+    }
+
+    /**
+     * Resets the state after a launch
+     */
+    public void reset() {
+
+    }
+
+    /**
+     * Method to launch pig for hadoop either for a cluster's job tracker or for
+     * a local job runner. THe only difference between the two is the job
+     * client. Depending on the pig context the job client will be initialize to
+     * one of the two. Launchers for other frameworks can overide these methods.
+     * Given an input PhysicalPlan, it compiles it to get a MapReduce Plan. The
+     * MapReduce plan which has multiple MapReduce operators each one of which
+     * has to be run as a map reduce job with dependency information stored in
+     * the plan. It compiles the MROperPlan into a JobControl object. Each Map
+     * Reduce operator is converted into a Job and added to the JobControl
+     * object. Each Job also has a set of dependent Jobs that are created using
+     * the MROperPlan. The JobControl object is obtained from the
+     * JobControlCompiler Then a new thread is spawned that submits these jobs
+     * while respecting the dependency information. The parent thread monitors
+     * the submitted jobs' progress and after it is complete, stops the
+     * JobControl thread.
+     * 
+     * @param php
+     * @param grpName
+     * @param pc
+     * @throws PlanException
+     * @throws VisitorException
+     * @throws IOException
+     * @throws ExecException
+     * @throws JobCreationException
+     */
+    public abstract PigStats launchPig(PhysicalPlan php, String grpName,
+            PigContext pc) throws Exception;
+
+    /**
+     * Explain how a pig job will be executed on the underlying infrastructure.
+     * 
+     * @param pp
+     *            PhysicalPlan to explain
+     * @param pc
+     *            PigContext to use for configuration
+     * @param ps
+     *            PrintStream to write output on.
+     * @param format
+     *            Format to write in
+     * @param verbose
+     *            Amount of information to print
+     * @throws VisitorException
+     * @throws IOException
+     */
+    public abstract void explain(PhysicalPlan pp, PigContext pc,
+            PrintStream ps, String format, boolean verbose)
+            throws PlanException, VisitorException, IOException;
+
+    public abstract void kill() throws BackendException;
+
+    public abstract void killJob(String jobID, JobConf jobConf)
+            throws BackendException;
+
+    protected boolean isComplete(double prog) {
+        return (int) (Math.ceil(prog)) == 1;
+    }
+
+    protected void getStats(Job job, JobClient jobClient, boolean errNotDbg,
+            PigContext pigContext) throws ExecException {
+        JobID MRJobID = job.getAssignedJobID();
+        String jobMessage = job.getMessage();
+        Exception backendException = null;
+        if (MRJobID == null) {
+            try {
+                LogUtils.writeLog(
+                        "Backend error message during job submission",
+                        jobMessage,
+                        pigContext.getProperties().getProperty("pig.logfile"),
+                        log);
+                backendException = getExceptionFromString(jobMessage);
+            } catch (Exception e) {
+                int errCode = 2997;
+                String msg = "Unable to recreate exception from backend error: "
+                        + jobMessage;
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+            throw new ExecException(backendException);
+        }
+        try {
+            TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
+            getErrorMessages(mapRep, "map", errNotDbg, pigContext);
+            totalHadoopTimeSpent += computeTimeSpent(mapRep);
+            mapRep = null;
+            TaskReport[] redRep = jobClient.getReduceTaskReports(MRJobID);
+            getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
+            totalHadoopTimeSpent += computeTimeSpent(redRep);
+            redRep = null;
+        } catch (IOException e) {
+            if (job.getState() == Job.SUCCESS) {
+                // if the job succeeded, let the user know that
+                // we were unable to get statistics
+                log.warn("Unable to get job related diagnostics");
+            } else {
+                throw new ExecException(e);
+            }
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    protected long computeTimeSpent(TaskReport[] taskReports) {
+        long timeSpent = 0;
+        for (TaskReport r : taskReports) {
+            timeSpent += (r.getFinishTime() - r.getStartTime());
+        }
+        return timeSpent;
+    }
+
+    protected void getErrorMessages(TaskReport reports[], String type,
+            boolean errNotDbg, PigContext pigContext) throws Exception {
+        for (int i = 0; i < reports.length; i++) {
+            String msgs[] = reports[i].getDiagnostics();
+            ArrayList<Exception> exceptions = new ArrayList<Exception>();
+            String exceptionCreateFailMsg = null;
+            boolean jobFailed = false;
+            float successfulProgress = 1.0f;
+            if (msgs.length > 0) {
+                // if the progress reported is not 1.0f then the map or reduce
+                // job failed
+                // this comparison is in place till Hadoop 0.20 provides methods
+                // to query
+                // job status
+                if (reports[i].getProgress() != successfulProgress) {
+                    jobFailed = true;
+                }
+                Set<String> errorMessageSet = new HashSet<String>();
+                for (int j = 0; j < msgs.length; j++) {
+                    if (!errorMessageSet.contains(msgs[j])) {
+                        errorMessageSet.add(msgs[j]);
+                        if (errNotDbg) {
+                            // errNotDbg is used only for failed jobs
+                            // keep track of all the unique exceptions
+                            try {
+                                LogUtils.writeLog("Backend error message",
+                                        msgs[j], pigContext.getProperties()
+                                                .getProperty("pig.logfile"),
+                                        log);
+                                Exception e = getExceptionFromString(msgs[j]);
+                                exceptions.add(e);
+                            } catch (Exception e1) {
+                                exceptionCreateFailMsg = msgs[j];
+
+                            }
+                        } else {
+                            log.debug("Error message from task (" + type + ") "
+                                    + reports[i].getTaskID() + msgs[j]);
+                        }
+                    }
+                }
+            }
+            // if there are no valid exception that could be created, report
+            if ((exceptions.size() == 0) && (exceptionCreateFailMsg != null)) {
+                int errCode = 2997;
+                String msg = "Unable to recreate exception from backed error: "
+                        + exceptionCreateFailMsg;
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+
+            // if its a failed job then check if there is more than one
+            // exception
+            // more than one exception implies possibly different kinds of
+            // failures
+            // log all the different failures and throw the exception
+            // corresponding
+            // to the first failure
+            if (jobFailed) {
+                if (exceptions.size() > 1) {
+                    for (int j = 0; j < exceptions.size(); ++j) {
+                        String headerMessage = "Error message from task ("
+                                + type + ") " + reports[i].getTaskID();
+                        LogUtils.writeLog(exceptions.get(j), pigContext
+                                .getProperties().getProperty("pig.logfile"),
+                                log, false, headerMessage, false, false);
+                    }
+                    throw exceptions.get(0);
+                } else if (exceptions.size() == 1) {
+                    throw exceptions.get(0);
+                } else {
+                    int errCode = 2115;
+                    String msg = "Internal error. Expected to throw exception from the backend. Did not find any exception to throw.";
+                    throw new ExecException(msg, errCode, PigException.BUG);
+                }
+            }
+        }
+    }
+
+    /**
+     * Compute the progress of the current job submitted through the JobControl
+     * object jc to the JobClient jobClient
+     * 
+     * @param jc
+     *            - The JobControl object that has been submitted
+     * @param jobClient
+     *            - The JobClient to which it has been submitted
+     * @return The progress as a precentage in double format
+     * @throws IOException
+     */
+    protected double calculateProgress(JobControl jc, JobClient jobClient)
+            throws IOException {
+        double prog = 0.0;
+        prog += jc.getSuccessfulJobs().size();
+
+        List<Job> runnJobs = jc.getRunningJobs();
+        for (Object object : runnJobs) {
+            Job j = (Job) object;
+            prog += progressOfRunningJob(j, jobClient);
+        }
+        return prog;
+    }
+
+    /**
+     * Returns the progress of a Job j which is part of a submitted JobControl
+     * object. The progress is for this Job. So it has to be scaled down by the
+     * num of jobs that are present in the JobControl.
+     * 
+     * @param j
+     *            - The Job for which progress is required
+     * @param jobClient
+     *            - the JobClient to which it has been submitted
+     * @return Returns the percentage progress of this Job
+     * @throws IOException
+     */
+    protected double progressOfRunningJob(Job j, JobClient jobClient)
+            throws IOException {
+        JobID mrJobID = j.getAssignedJobID();
+        RunningJob rj = jobClient.getJob(mrJobID);
+        if (rj == null && j.getState() == Job.SUCCESS)
+            return 1;
+        else if (rj == null)
+            return 0;
+        else {
+            double mapProg = rj.mapProgress();
+            double redProg = rj.reduceProgress();
+            return (mapProg + redProg) / 2;
+        }
+    }
+
+    public long getTotalHadoopTimeSpent() {
+        return totalHadoopTimeSpent;
+    }
+
+    /**
+     * 
+     * @param stackTraceLine
+     *            The string representation of
+     *            {@link Throwable#printStackTrace() printStackTrace} Handles
+     *            internal PigException and its subclasses that override the
+     *            {@link Throwable#toString() toString} method
+     * @return An exception object whose string representation of
+     *         printStackTrace is the input stackTrace
+     * @throws Exception
+     */
+    public Exception getExceptionFromString(String stackTrace) throws Exception {
+        String[] lines = stackTrace.split(newLine);
+        Throwable t = getExceptionFromStrings(lines, 0);
+
+        if (!pigException) {
+            int errCode = 6015;
+            String msg = "During execution, encountered a Hadoop error.";
+            ExecException ee = new ExecException(msg, errCode,
+                    PigException.REMOTE_ENVIRONMENT, t);
+            ee.setStackTrace(t.getStackTrace());
+            return ee;
+        } else {
+            pigException = false;
+            if (outOfMemory) {
+                outOfMemory = false;
+                int errCode = 6016;
+                String msg = "Out of memory.";
+                ExecException ee = new ExecException(msg, errCode,
+                        PigException.REMOTE_ENVIRONMENT, t);
+                ee.setStackTrace(t.getStackTrace());
+                return ee;
+            }
+            return (Exception) t;
+        }
+    }
+
+    /**
+     * 
+     * @param stackTraceLine
+     *            An array of strings that represent
+     *            {@link Throwable#printStackTrace() printStackTrace} output,
+     *            split by newline
+     * @return An exception object whose string representation of
+     *         printStackTrace is the input stackTrace
+     * @throws Exception
+     */
+    private Throwable getExceptionFromStrings(String[] stackTraceLines,
+            int startingLineNum) throws Exception {
+        /*
+         * parse the array of string and throw the appropriate exception first:
+         * from the line startingLineNum extract the exception name extract the
+         * message if any fourth: create the appropriate exception and return it
+         * An example of the stack trace:
+         * org.apache.pig.backend.executionengine.ExecException: ERROR 1075:
+         * Received a bytearray from the UDF. Cannot determine how to convert
+         * the bytearray to int. at
+         * org.apache.pig.backend.hadoop.executionengine
+         * .physicalLayer.expressionOperators.POCast.getNext(POCast.java:152) at
+         * org.apache.pig.backend.hadoop.executionengine.physicalLayer.
+         * expressionOperators.LessThanExpr.getNext(LessThanExpr.java:85) at
+         * org.apache.pig.backend.hadoop.executionengine.physicalLayer.
+         * relationalOperators.POFilter.getNext(POFilter.java:148) at
+         * org.apache.
+         * pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase
+         * .runPipeline(PigMapBase.java:184) at
+         * org.apache.pig.backend.hadoop.executionengine
+         * .mapReduceLayer.PigMapBase.map(PigMapBase.java:174) at
+         * org.apache.pig.
+         * backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map
+         * .map(PigMapOnly.java:65) at
+         * org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:47) at
+         * org.apache.hadoop.mapred.MapTask.run(MapTask.java:227) at
+         * org.apache.hadoop
+         * .mapred.TaskTracker$Child.main(TaskTracker.java:2207)
+         */
+
+        if (stackTraceLines.length > 0
+                && startingLineNum < (stackTraceLines.length - 1)) {
+
+            // the regex for matching the exception class name; note the use of
+            // the $ for matching nested classes
+            String exceptionNameDelimiter = "(\\w+(\\$\\w+)?\\.)+\\w+";
+            Pattern exceptionNamePattern = Pattern
+                    .compile(exceptionNameDelimiter);
+
+            // from the first line extract the exception name and the exception
+            // message
+            Matcher exceptionNameMatcher = exceptionNamePattern
+                    .matcher(stackTraceLines[startingLineNum]);
+            String exceptionName = null;
+            String exceptionMessage = null;
+            if (exceptionNameMatcher.find()) {
+                exceptionName = exceptionNameMatcher.group();
+                /*
+                 * note that the substring is from end + 2 the regex matcher
+                 * ends at one position beyond the match in this case it will
+                 * end at colon (:) the exception message will have a preceding
+                 * space (after the colon (:))
+                 */
+                if (exceptionName.contains(OOM_ERR)) {
+                    outOfMemory = true;
+                }
+
+                if (stackTraceLines[startingLineNum].length() > exceptionNameMatcher
+                        .end()) {
+                    exceptionMessage = stackTraceLines[startingLineNum]
+                            .substring(exceptionNameMatcher.end() + 2);
+                }
+
+                ++startingLineNum;
+            }
+
+            // the exceptionName should not be null
+            if (exceptionName != null) {
+
+                ArrayList<StackTraceElement> stackTraceElements = new ArrayList<StackTraceElement>();
+
+                // Create stack trace elements for the remaining lines
+                String stackElementRegex = "\\s+at\\s+(\\w+(\\$\\w+)?\\.)+(\\<)?\\w+(\\>)?";
+                Pattern stackElementPattern = Pattern
+                        .compile(stackElementRegex);
+                String pigExceptionRegex = "org\\.apache\\.pig\\.";
+                Pattern pigExceptionPattern = Pattern
+                        .compile(pigExceptionRegex);
+                String moreElementRegex = "\\s+\\.\\.\\.\\s+\\d+\\s+more";
+                Pattern moreElementPattern = Pattern.compile(moreElementRegex);
+
+                int lineNum = startingLineNum;
+                for (; lineNum < (stackTraceLines.length - 1); ++lineNum) {
+                    Matcher stackElementMatcher = stackElementPattern
+                            .matcher(stackTraceLines[lineNum]);
+
+                    if (stackElementMatcher.find()) {
+                        StackTraceElement ste = getStackTraceElement(stackTraceLines[lineNum]);
+                        stackTraceElements.add(ste);
+                        String className = ste.getClassName();
+                        Matcher pigExceptionMatcher = pigExceptionPattern
+                                .matcher(className);
+                        if (pigExceptionMatcher.find()) {
+                            pigException = true;
+                        }
+                    } else {
+                        Matcher moreElementMatcher = moreElementPattern
+                                .matcher(stackTraceLines[lineNum]);
+                        if (moreElementMatcher.find()) {
+                            ++lineNum;
+                        }
+                        break;
+                    }
+                }
+
+                startingLineNum = lineNum;
+
+                // create the appropriate exception; setup the stack trace and
+                // message
+                Object object = PigContext
+                        .instantiateFuncFromSpec(exceptionName);
+
+                if (object instanceof PigException) {
+                    // extract the error code and message the regex for matching
+                    // the custom format of ERROR <ERROR CODE>:
+                    String errMessageRegex = "ERROR\\s+\\d+:";
+                    Pattern errMessagePattern = Pattern
+                            .compile(errMessageRegex);
+                    Matcher errMessageMatcher = errMessagePattern
+                            .matcher(exceptionMessage);
+
+                    if (errMessageMatcher.find()) {
+                        String errMessageStub = errMessageMatcher.group();
+                        /*
+                         * extract the actual exception message sans the ERROR
+                         * <ERROR CODE>: again note that the matcher ends at the
+                         * space following the colon (:) the exception message
+                         * appears after the space and hence the end + 1
+                         */
+                        exceptionMessage = exceptionMessage
+                                .substring(errMessageMatcher.end() + 1);
+
+                        // the regex to match the error code wich is a string of
+                        // numerals
+                        String errCodeRegex = "\\d+";
+                        Pattern errCodePattern = Pattern.compile(errCodeRegex);
+                        Matcher errCodeMatcher = errCodePattern
+                                .matcher(errMessageStub);
+
+                        String code = null;
+                        if (errCodeMatcher.find()) {
+                            code = errCodeMatcher.group();
+                        }
+
+                        // could receive a number format exception here but it
+                        // will be propagated up the stack
+                        int errCode;
+                        if (code != null)
+                            errCode = Integer.parseInt(code);
+                        else
+                            errCode = 2998;
+
+                        // create the exception with the message and then set
+                        // the error code and error source
+                        FuncSpec funcSpec = new FuncSpec(exceptionName,
+                                exceptionMessage);
+                        object = PigContext.instantiateFuncFromSpec(funcSpec);
+                        ((PigException) object).setErrorCode(errCode);
+                        ((PigException) object).setErrorSource(PigException
+                                .determineErrorSource(errCode));
+                    } else { // else for if(errMessageMatcher.find())
+                        /*
+                         * did not find the error code which means that the
+                         * PigException or its subclass is not returning the
+                         * error code highly unlikely: should never be here
+                         */
+                        FuncSpec funcSpec = new FuncSpec(exceptionName,
+                                exceptionMessage);
+                        object = PigContext.instantiateFuncFromSpec(funcSpec);
+                        ((PigException) object).setErrorCode(2997);// generic
+                                                                    // error
+                                                                    // code
+                        ((PigException) object)
+                                .setErrorSource(PigException.BUG);
+                    }
+                } else { // else for if(object instanceof PigException)
+                    // its not PigException; create the exception with the
+                    // message
+                    object = PigContext.instantiateFuncFromSpec(new FuncSpec(
+                            exceptionName, exceptionMessage));
+                }
+
+                StackTraceElement[] steArr = new StackTraceElement[stackTraceElements
+                        .size()];
+                ((Throwable) object).setStackTrace(stackTraceElements
+                        .toArray(steArr));
+
+                if (startingLineNum < (stackTraceLines.length - 1)) {
+                    Throwable e = getExceptionFromStrings(stackTraceLines,
+                            startingLineNum);
+                    ((Throwable) object).initCause(e);
+                }
+
+                return (Throwable) object;
+            } else { // else for if(exceptionName != null)
+                int errCode = 2055;
+                String msg = "Did not find exception name to create exception from string: "
+                        + Arrays.toString(stackTraceLines);
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+        } else { // else for if(lines.length > 0)
+            int errCode = 2056;
+            String msg = "Cannot create exception from empty string.";
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    /**
+     * 
+     * @param line
+     *            the string representation of a stack trace returned by
+     *            {@link Throwable#printStackTrace() printStackTrace}
+     * @return the StackTraceElement object representing the stack trace
+     * @throws Exception
+     */
+    public StackTraceElement getStackTraceElement(String line) throws Exception {
+        /*
+         * the format of the line is something like: at
+         * org.apache.pig.backend.hadoop
+         * .executionengine.mapReduceLayer.PigMapOnly$Map
+         * .map(PigMapOnly.java:65) note the white space before the 'at'. Its
+         * not of much importance but noted for posterity.
+         */
+        String[] items;
+
+        /*
+         * regex for matching the fully qualified method Name note the use of
+         * the $ for matching nested classes and the use of < and > for
+         * constructors
+         */
+        String qualifiedMethodNameRegex = "(\\w+(\\$\\w+)?\\.)+(<)?\\w+(>)?";
+        Pattern qualifiedMethodNamePattern = Pattern
+                .compile(qualifiedMethodNameRegex);
+        Matcher contentMatcher = qualifiedMethodNamePattern.matcher(line);
+
+        // org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map(PigMapOnly.java:65)
+        String content = null;
+        if (contentMatcher.find()) {
+            content = line.substring(contentMatcher.start());
+        } else {
+            int errCode = 2057;
+            String msg = "Did not find fully qualified method name to reconstruct stack trace: "
+                    + line;
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+
+        Matcher qualifiedMethodNameMatcher = qualifiedMethodNamePattern
+                .matcher(content);
+
+        // org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map
+        String qualifiedMethodName = null;
+        // (PigMapOnly.java:65)
+        String fileDetails = null;
+
+        if (qualifiedMethodNameMatcher.find()) {
+            qualifiedMethodName = qualifiedMethodNameMatcher.group();
+            fileDetails = content
+                    .substring(qualifiedMethodNameMatcher.end() + 1);
+        } else {
+            int errCode = 2057;
+            String msg = "Did not find fully qualified method name to reconstruct stack trace: "
+                    + line;
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+
+        // From the fully qualified method name, extract the declaring class and
+        // method name
+        items = qualifiedMethodName.split("\\.");
+
+        // initialize the declaringClass (to org in most cases)
+        String declaringClass = items[0];
+        // the last member is always the method name
+        String methodName = items[items.length - 1];
+        StringBuilder sb = new StringBuilder();
+
+        // concatenate the names by adding the dot (.) between the members till
+        // the penultimate member
+        for (int i = 1; i < items.length - 1; ++i) {
+            sb.append('.');
+            sb.append(items[i]);
+        }
+
+        declaringClass += sb.toString();
+
+        // from the file details extract the file name and the line number
+        // PigMapOnly.java:65
+        fileDetails = fileDetails.substring(0, fileDetails.length() - 1);
+        items = fileDetails.split(":");
+        // PigMapOnly.java
+        String fileName = null;
+        int lineNumber = -1;
+        if (items.length > 0) {
+            fileName = items[0];
+            if (items.length > 1) {
+                lineNumber = Integer.parseInt(items[1]);
+            }
+        }
+        return new StackTraceElement(declaringClass, methodName, fileName,
+                lineNumber);
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketImplFactory;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigConstants;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
+import org.apache.pig.newplan.logical.optimizer.UidResetter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
+import org.apache.pig.newplan.logical.visitor.SortInfoSetter;
+import org.apache.pig.newplan.logical.visitor.StoreAliasSetter;
+import org.apache.pig.pen.POOptimizeDisabler;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+public class MRExecutionEngine implements ExecutionEngine {
+
+    public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
+    private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+    private static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
+
+    private static final String HADOOP_SITE = "hadoop-site.xml";
+    private static final String CORE_SITE = "core-site.xml";
+    private static final String YARN_SITE = "yarn-site.xml";
+    private final Log log = LogFactory.getLog(getClass());
+    public static final String LOCAL = "local";
+
+    protected PigContext pigContext;
+
+    protected DataStorage ds;
+
+    @SuppressWarnings("deprecation")
+    protected JobConf jobConf;
+
+    // key: the operator key from the logical plan that originated the physical
+    // plan
+    // val: the operator key for the root of the phyisical plan
+    protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
+
+    protected Map<Operator, PhysicalOperator> newLogToPhyMap;
+    private LogicalPlan newPreoptimizedPlan;
+
+    protected Launcher launcher;
+
+    public MRExecutionEngine(PigContext pigContext) {
+        this.pigContext = pigContext;
+        this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
+
+        this.ds = null;
+
+        // to be set in the init method
+        this.jobConf = null;
+
+        this.launcher = new MapReduceLauncher();
+    }
+
+    @SuppressWarnings("deprecation")
+    public JobConf getJobConf() {
+        return this.jobConf;
+    }
+
+    @Override
+    public DataStorage getDataStorage() {
+        return this.ds;
+    }
+
+    @Override
+    public void init() throws ExecException {
+        init(this.pigContext.getProperties());
+    }
+
+    @SuppressWarnings({ "deprecation", "resource" })
+    private void init(Properties properties) throws ExecException {
+        // First set the ssh socket factory
+        setSSHFactory();
+
+        String cluster = null;
+        String nameNode = null;
+
+        // We need to build a configuration object first in the manner described
+        // below
+        // and then get back a properties object to inspect the
+        // JOB_TRACKER_LOCATION
+        // and FILE_SYSTEM_LOCATION. The reason to do this is if we looked only
+        // at
+        // the existing properties object, we may not get the right settings. So
+        // we want
+        // to read the configurations in the order specified below and only then
+        // look
+        // for JOB_TRACKER_LOCATION and FILE_SYSTEM_LOCATION.
+
+        // Hadoop by default specifies two resources, loaded in-order from the
+        // classpath:
+        // 1. hadoop-default.xml : Read-only defaults for hadoop.
+        // 2. hadoop-site.xml: Site-specific configuration for a given hadoop
+        // installation.
+        // Now add the settings from "properties" object to override any
+        // existing properties
+        // All of the above is accomplished in the method call below
+
+        JobConf jc = null;
+        if (!this.pigContext.getExecType().isLocal()) {
+            // Check existence of user provided configs
+            String isHadoopConfigsOverriden = properties
+                    .getProperty("pig.use.overriden.hadoop.configs");
+            if (isHadoopConfigsOverriden != null
+                    && isHadoopConfigsOverriden.equals("true")) {
+                jc = new JobConf(ConfigurationUtil.toConfiguration(properties));
+            } else {
+                // Check existence of hadoop-site.xml or core-site.xml in
+                // classpath
+                // if user provided confs are not being used
+                Configuration testConf = new Configuration();
+                ClassLoader cl = testConf.getClassLoader();
+                URL hadoop_site = cl.getResource(HADOOP_SITE);
+                URL core_site = cl.getResource(CORE_SITE);
+
+                if (hadoop_site == null && core_site == null) {
+                    throw new ExecException(
+                            "Cannot find hadoop configurations in classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath)."
+                                    + " If you plan to use local mode, please put -x local option in command line",
+                            4010);
+                }
+                jc = new JobConf();
+            }
+            jc.addResource("pig-cluster-hadoop-site.xml");
+            jc.addResource(YARN_SITE);
+
+            // Trick to invoke static initializer of DistributedFileSystem to
+            // add hdfs-default.xml
+            // into configuration
+            new DistributedFileSystem();
+
+            // the method below alters the properties object by overriding the
+            // hadoop properties with the values from properties and recomputing
+            // the properties
+            recomputeProperties(jc, properties);
+        } else {
+            // If we are running in local mode we dont read the hadoop conf file
+            if (properties.getProperty("mapreduce.framework.name") == null) {
+                properties.setProperty("mapreduce.framework.name", "local");
+            }
+            properties.setProperty(JOB_TRACKER_LOCATION, LOCAL);
+            properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
+            properties
+                    .setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
+
+            jc = new JobConf(false);
+            jc.addResource("core-default.xml");
+            jc.addResource("mapred-default.xml");
+            jc.addResource("yarn-default.xml");
+            recomputeProperties(jc, properties);
+        }
+
+        cluster = jc.get(JOB_TRACKER_LOCATION);
+        nameNode = jc.get(FILE_SYSTEM_LOCATION);
+        if (nameNode == null)
+            nameNode = (String) pigContext.getProperties().get(
+                    ALTERNATIVE_FILE_SYSTEM_LOCATION);
+
+        if (cluster != null && cluster.length() > 0) {
+            if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
+                cluster = cluster + ":50020";
+            }
+            properties.setProperty(JOB_TRACKER_LOCATION, cluster);
+        }
+
+        if (nameNode != null && nameNode.length() > 0) {
+            if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
+                nameNode = nameNode + ":8020";
+            }
+            properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
+        }
+
+        log.info("Connecting to hadoop file system at: "
+                + (nameNode == null ? LOCAL : nameNode));
+        // constructor sets DEFAULT_REPLICATION_FACTOR_KEY
+        ds = new HDataStorage(properties);
+
+        if (cluster != null && !cluster.equalsIgnoreCase(LOCAL)) {
+            log.info("Connecting to map-reduce job tracker at: "
+                    + jc.get(JOB_TRACKER_LOCATION));
+        }
+
+        // Set job-specific configuration knobs
+        jobConf = jc;
+    }
+
+    @SuppressWarnings("unchecked")
+    public PhysicalPlan compile(LogicalPlan plan, Properties properties)
+            throws FrontendException {
+        if (plan == null) {
+            int errCode = 2041;
+            String msg = "No Plan to compile";
+            throw new FrontendException(msg, errCode, PigException.BUG);
+        }
+
+        newPreoptimizedPlan = new LogicalPlan(plan);
+
+        if (pigContext.inIllustrator) {
+            // disable all PO-specific optimizations
+            POOptimizeDisabler pod = new POOptimizeDisabler(plan);
+            pod.visit();
+        }
+
+        UidResetter uidResetter = new UidResetter(plan);
+        uidResetter.visit();
+
+        SchemaResetter schemaResetter = new SchemaResetter(plan, true /*
+                                                                     * skip
+                                                                     * duplicate
+                                                                     * uid check
+                                                                     */);
+        schemaResetter.visit();
+
+        HashSet<String> disabledOptimizerRules;
+        try {
+            disabledOptimizerRules = (HashSet<String>) ObjectSerializer
+                    .deserialize(pigContext.getProperties().getProperty(
+                            PigImplConstants.PIG_OPTIMIZER_RULES_KEY));
+        } catch (IOException ioe) {
+            int errCode = 2110;
+            String msg = "Unable to deserialize optimizer rules.";
+            throw new FrontendException(msg, errCode, PigException.BUG, ioe);
+        }
+        if (disabledOptimizerRules == null) {
+            disabledOptimizerRules = new HashSet<String>();
+        }
+
+        String pigOptimizerRulesDisabled = this.pigContext.getProperties()
+                .getProperty(PigConstants.PIG_OPTIMIZER_RULES_DISABLED_KEY);
+        if (pigOptimizerRulesDisabled != null) {
+            disabledOptimizerRules.addAll(Lists.newArrayList((Splitter.on(",")
+                    .split(pigOptimizerRulesDisabled))));
+        }
+
+        if (pigContext.inIllustrator) {
+            disabledOptimizerRules.add("MergeForEach");
+            disabledOptimizerRules.add("PartitionFilterOptimizer");
+            disabledOptimizerRules.add("LimitOptimizer");
+            disabledOptimizerRules.add("SplitFilter");
+            disabledOptimizerRules.add("PushUpFilter");
+            disabledOptimizerRules.add("MergeFilter");
+            disabledOptimizerRules.add("PushDownForEachFlatten");
+            disabledOptimizerRules.add("ColumnMapKeyPrune");
+            disabledOptimizerRules.add("AddForEach");
+            disabledOptimizerRules.add("GroupByConstParallelSetter");
+        }
+
+        StoreAliasSetter storeAliasSetter = new StoreAliasSetter(plan);
+        storeAliasSetter.visit();
+
+        // run optimizer
+        LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(plan, 100,
+                disabledOptimizerRules);
+        optimizer.optimize();
+
+        // compute whether output data is sorted or not
+        SortInfoSetter sortInfoSetter = new SortInfoSetter(plan);
+        sortInfoSetter.visit();
+
+        if (!pigContext.inExplain) {
+            // Validate input/output file. Currently no validation framework in
+            // new logical plan, put this validator here first.
+            // We might decide to move it out to a validator framework in future
+            InputOutputFileValidator validator = new InputOutputFileValidator(
+                    plan, pigContext);
+            validator.validate();
+        }
+
+        // translate new logical plan to physical plan
+        LogToPhyTranslationVisitor translator = new LogToPhyTranslationVisitor(
+                plan);
+
+        translator.setPigContext(pigContext);
+        translator.visit();
+        newLogToPhyMap = translator.getLogToPhyMap();
+        return translator.getPhysicalPlan();
+    }
+
+    public Map<Operator, PhysicalOperator> getLogToPhyMap() {
+        return newLogToPhyMap;
+    }
+
+    public Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap(
+            LogicalPlan plan) {
+        Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> result = new HashMap<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>>();
+        Iterator<Operator> outerIter = plan.getOperators();
+        while (outerIter.hasNext()) {
+            Operator oper = outerIter.next();
+            if (oper instanceof LOForEach) {
+                LogicalPlan innerPlan = ((LOForEach) oper).getInnerPlan();
+                Map<LogicalRelationalOperator, PhysicalOperator> innerOpMap = new HashMap<LogicalRelationalOperator, PhysicalOperator>();
+                Iterator<Operator> innerIter = innerPlan.getOperators();
+                while (innerIter.hasNext()) {
+                    Operator innerOper = innerIter.next();
+                    innerOpMap.put(((LogicalRelationalOperator) innerOper),
+                            newLogToPhyMap.get(innerOper));
+                }
+                result.put((LOForEach) oper, innerOpMap);
+            }
+        }
+        return result;
+    }
+
+    public LogicalPlan getNewPlan() {
+        return newPreoptimizedPlan;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected void setSSHFactory() {
+        Properties properties = this.pigContext.getProperties();
+        String g = properties.getProperty("ssh.gateway");
+        if (g == null || g.length() == 0)
+            return;
+        try {
+            Class clazz = Class
+                    .forName("org.apache.pig.shock.SSHSocketImplFactory");
+            SocketImplFactory f = (SocketImplFactory) clazz.getMethod(
+                    "getFactory", new Class[0]).invoke(0, new Object[0]);
+            Socket.setSocketImplFactory(f);
+        } catch (SocketException e) {
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Method to apply pig properties to JobConf (replaces properties with
+     * resulting jobConf values)
+     * 
+     * @param conf
+     *            JobConf with appropriate hadoop resource files
+     * @param properties
+     *            Pig properties that will override hadoop properties;
+     *            properties might be modified
+     */
+    @SuppressWarnings("deprecation")
+    protected void recomputeProperties(JobConf jobConf, Properties properties) {
+        // We need to load the properties from the hadoop configuration
+        // We want to override these with any existing properties we have.
+        if (jobConf != null && properties != null) {
+            // set user properties on the jobConf to ensure that defaults
+            // and deprecation is applied correctly
+            Enumeration<Object> propertiesIter = properties.keys();
+            while (propertiesIter.hasMoreElements()) {
+                String key = (String) propertiesIter.nextElement();
+                String val = properties.getProperty(key);
+                // We do not put user.name, See PIG-1419
+                if (!key.equals("user.name"))
+                    jobConf.set(key, val);
+            }
+            // clear user defined properties and re-populate
+            properties.clear();
+            Iterator<Map.Entry<String, String>> iter = jobConf.iterator();
+            while (iter.hasNext()) {
+                Map.Entry<String, String> entry = iter.next();
+                properties.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    @Override
+    public ScriptState instantiateScriptState() {
+        return new MRScriptState(UUID.randomUUID().toString());
+    }
+
+    @Override
+    public PigStats launchPig(LogicalPlan lp, String grpName, PigContext pc)
+            throws FrontendException, ExecException {
+
+        try {
+            PhysicalPlan pp = compile(lp, pc.getProperties());
+            return launcher.launchPig(pp, grpName, pigContext);
+        } catch (ExecException e) {
+            throw (ExecException) e;
+        } catch (FrontendException e) {
+            throw (FrontendException) e;
+        } catch (Exception e) {
+            throw new ExecException(e);
+        } finally {
+            launcher.reset();
+        }
+    }
+
+    @Override
+    public void explain(LogicalPlan lp, PigContext pc, PrintStream ps,
+            String format, boolean verbose, File file, String suffix)
+            throws PlanException, VisitorException, IOException,
+            FrontendException {
+
+        PrintStream pps = ps;
+        PrintStream eps = ps;
+
+        try {
+
+            if (file != null) {
+                pps = new PrintStream(new File(file, "physical_plan-" + suffix));
+                eps = new PrintStream(new File(file, "exec_plan-" + suffix));
+            }
+
+            PhysicalPlan pp = compile(lp, pc.getProperties());
+            pp.explain(pps, format, verbose);
+
+            MapRedUtil.checkLeafIsStore(pp, pigContext);
+            launcher.explain(pp, pigContext, eps, format, verbose);
+        } finally {
+            launcher.reset();
+            pps.close();
+            eps.close();
+        }
+    }
+
+    @Override
+    public Properties getConfiguration() {
+        if (jobConf == null)
+            return null;
+        return ConfigurationUtil.toProperties(jobConf);
+    }
+
+    @Override
+    public void setConfiguration(Properties newConfiguration)
+            throws ExecException {
+        init(newConfiguration);
+    }
+
+    @Override
+    public void setProperty(String property, String value) {
+        // mPigServer.getPigContext().getProperties().setProperty(key, value);
+        // PIG-2508 properties need to be managed through JobConf
+        // since all other code depends on access to properties,
+        // we need to re-populate from updated JobConf
+        // java.util.HashSet<?> keysBefore = new
+        // java.util.HashSet<Object>(mPigServer.getPigContext().getProperties().keySet());
+        // set current properties on jobConf
+        Properties properties = pigContext.getProperties();
+        Enumeration<Object> propertiesIter = pigContext.getProperties().keys();
+        while (propertiesIter.hasMoreElements()) {
+            String pkey = (String) propertiesIter.nextElement();
+            String val = properties.getProperty(pkey);
+            // We do not put user.name, See PIG-1419
+            if (!pkey.equals("user.name"))
+                jobConf.set(pkey, val);
+        }
+        // set new value, JobConf will handle deprecation etc.
+        jobConf.set(property, value);
+        // re-initialize to reflect updated JobConf
+        properties.clear();
+        Iterator<Map.Entry<String, String>> iter = jobConf.iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, String> entry = iter.next();
+            properties.put(entry.getKey(), entry.getValue());
+        }
+        // keysBefore.removeAll(mPigServer.getPigContext().getProperties().keySet());
+        // log.info("PIG-2508: keys dropped from properties: " + keysBefore);
+    }
+
+    @Override
+    public ExecutableManager getExecutableManager() {
+        return new HadoopExecutableManager();
+    }
+
+    @Override
+    public void kill() throws BackendException {
+        if (launcher != null) {
+            launcher.kill();
+        }
+    }
+
+    @Override
+    public void killJob(String jobID) throws BackendException {
+        if (launcher != null) {
+            launcher.killJob(jobID, jobConf);
+        }
+    }
+
+}