You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/08/30 22:04:31 UTC
svn commit: r1519062 [1/5] - in /pig/trunk: ./ src/META-INF/services/
src/org/apache/pig/ src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ s...
Author: cheolsoo
Date: Fri Aug 30 20:04:29 2013
New Revision: 1519062
URL: http://svn.apache.org/r1519062
Log:
PIG-3419: Pluggable Execution Engine (achalsoni81 via cheolsoo)
Added:
pig/trunk/src/META-INF/services/org.apache.pig.ExecType
pig/trunk/src/org/apache/pig/ExecTypeProvider.java
pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
pig/trunk/test/org/apache/pig/test/TestMRExecutionEngine.java
pig/trunk/test/org/apache/pig/test/TestMRJobStats.java
Removed:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java
pig/trunk/test/org/apache/pig/test/TestHExecutionEngine.java
pig/trunk/test/org/apache/pig/test/TestJobStats.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/build.xml
pig/trunk/src/org/apache/pig/ExecType.java
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
pig/trunk/src/org/apache/pig/impl/PigContext.java
pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
pig/trunk/test/org/apache/pig/test/TestCounters.java
pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java
pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java
pig/trunk/test/org/apache/pig/test/TestGrunt.java
pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java
pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
pig/trunk/test/org/apache/pig/test/TestNumberOfReducers.java
pig/trunk/test/org/apache/pig/test/TestParser.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
pig/trunk/test/org/apache/pig/test/TestPigSplit.java
pig/trunk/test/org/apache/pig/test/TestPigStats.java
pig/trunk/test/org/apache/pig/test/TestProjectRange.java
pig/trunk/test/org/apache/pig/test/Util.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Aug 30 20:04:29 2013
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
+PIG-3419: Pluggable Execution Engine (achalsoni81 via cheolsoo)
+
PIG-3191: [piggybank] MultiStorage output filenames are not sortable (Danny Antonelli via jcoveney)
PIG-3174: Remove rpm and deb artifacts from build.xml (gates)
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Fri Aug 30 20:04:29 2013
@@ -527,6 +527,9 @@
<echo>*** Else, you will only be warned about deprecations ***</echo>
<compileSources sources="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2;${src.shims.dir}"
excludes="" dist="${build.classes}" cp="classpath" warnings="${javac.args.warnings}" />
+ <copy todir="${build.classes}/META-INF">
+ <fileset dir="${src.dir}/META-INF" includes="**"/>
+ </copy>
</target>
<target name="compile-test" depends="compile, ivy-test">
@@ -536,7 +539,6 @@
<compileSources sources="${test.src.dir};${src.shims.test.dir}"
excludes="**/PigTestLoader.java **/resources/** perf/**" dist="${test.build.classes}" cp="test.classpath" warnings="${javac.args.warnings}" />
<copy file="${basedir}/test/hbase-site.xml" tofile="${test.build.classes}/hbase-site.xml"/>
-
<ivy:cachepath pathid="mr-apps-test-ivy.classpath" conf="test" />
<path id="mr-apps-test.classpath">
<fileset dir="${clover.home}" erroronmissingdir="false">
Added: pig/trunk/src/META-INF/services/org.apache.pig.ExecType
URL: http://svn.apache.org/viewvc/pig/trunk/src/META-INF/services/org.apache.pig.ExecType?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/META-INF/services/org.apache.pig.ExecType (added)
+++ pig/trunk/src/META-INF/services/org.apache.pig.ExecType Fri Aug 30 20:04:29 2013
@@ -0,0 +1,16 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalExecType
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecType
+
Modified: pig/trunk/src/org/apache/pig/ExecType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ExecType.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/ExecType.java (original)
+++ pig/trunk/src/org/apache/pig/ExecType.java Fri Aug 30 20:04:29 2013
@@ -19,36 +19,70 @@
package org.apache.pig;
import java.io.Serializable;
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalExecType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecType;
+import org.apache.pig.impl.PigContext;
/**
- * The type of query execution
+ * The type of query execution. Pig will cycle through all implementations
+ * of ExecType and choose the first one that matches the Properties passed in.
+ * This ExecType then dictates the ExecutionEngine used for processing and
+ * other behaviour throughout Pig. Any implementing classes should be noted in
+ * the META-INF/services folder titled org.apache.pig.ExecType as per the
+ * Java ServiceLoader specification.
*/
-public enum ExecType implements Serializable {
+public interface ExecType extends Serializable {
+
+ public static final ExecType LOCAL = new LocalExecType();
+ public static final ExecType MAPREDUCE = new MRExecType();
+
+ /**
+ * An ExecType is selected based off the Properties for the given script.
+ * There may be multiple settings that trigger the selection of a given
+ * ExecType. For example, distributed MR mode is currently triggered if the
+ * user specifies "mapred" or "mapreduce". It is desirable to override the
+ * toString method of the given ExecType to uniquely identify the ExecType.
+ *
+ * The initialize method should return true if it accepts the properties or
+ * false if it does not. The Java ServiceLoader framework will be used to
+ * iterate through and select the correct ExecType.
+ */
+
+ public boolean accepts(Properties properties);
+
/**
- * Run everything on the local machine
+ * Returns the Execution Engine that this ExecType is associated with. Once
+ * the ExecType the script is running in is determined by the PigServer, it
+ * will then call this method to get an instance of the ExecutionEngine
+ * associated with this ExecType to delegate all further execution to on the
+ * backend.
*/
- LOCAL,
+ public ExecutionEngine getExecutionEngine(PigContext pigContext);
+
/**
- * Use the Hadoop Map/Reduce framework
+ * Returns the Execution Engine class that this ExecType is associated with.
+ * This method simply returns the class of the ExecutionEngine associated
+ * with this ExecType, and not an instance of it.
*/
- MAPREDUCE;
+ public Class<? extends ExecutionEngine> getExecutionEngineClass();
/**
- * Given a string, determine the exec type.
- * @param execString accepted values are 'local', 'mapreduce', and 'mapred'
- * @return exectype as ExecType
+ * An ExecType is classified as local if it runs in-process and through the
+ * local filesystem. While an ExecutionEngine may have a more nuanced notion
+ * of local mode, these are the qualifications Pig requires.
+ * ExecutionEngines can extend the ExecType interface to additionally
+ * differentiate between ExecTypes as necessary.
*/
- public static ExecType fromString(String execString) throws PigException {
- if (execString.equals("mapred")) {
- return MAPREDUCE;
- } else {
- try {
- return ExecType.valueOf(execString.toUpperCase());
- } catch (IllegalArgumentException e) {
- int errCode = 2040;
- String msg = "Unknown exec type: " + execString;
- throw new PigException(msg, errCode, e);
- }
- }
- }
+ public boolean isLocal();
+
+
+ /**
+ * Returns the canonical name for this ExecType.
+ * @return
+ */
+ public String name();
+
}
Added: pig/trunk/src/org/apache/pig/ExecTypeProvider.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ExecTypeProvider.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/ExecTypeProvider.java (added)
+++ pig/trunk/src/org/apache/pig/ExecTypeProvider.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalExecType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecType;
+import org.apache.pig.impl.util.PropertiesUtil;
+
+public class ExecTypeProvider {
+
+ private static final Log log = LogFactory.getLog(ExecTypeProvider.class);
+
+ public static ExecType selectExecType(Properties properties)
+ throws PigException {
+
+ ServiceLoader<ExecType> frameworkLoader = ServiceLoader
+ .load(ExecType.class);
+
+ for (ExecType execType : frameworkLoader) {
+ log.info("Trying ExecType : " + execType);
+ if (execType.accepts(properties)) {
+ log.debug("Picked " + execType + " as the ExecType");
+ return getSingleton(execType);
+ } else {
+ log.debug("Cannot pick " + execType + " as the ExecType");
+ }
+ }
+ throw new PigException("Unknown exec type: "
+ + properties.getProperty("exectype"), 2040);
+ }
+
+ /**
+ * This method attempts to return a singleton instance of the given exec
+ * type. Only works for MR ExecTypes as these are the only ExecTypes that we
+ * have constants in the Pig codebase for.
+ *
+ * @param execType
+ * @return
+ */
+ private static ExecType getSingleton(ExecType execType) {
+ if (execType instanceof MRExecType) {
+ return ExecType.MAPREDUCE;
+ }
+ if (execType instanceof LocalExecType) {
+ return ExecType.LOCAL;
+ }
+ // if it is not MR specific but rather a different
+ // execution engine, we don't have access to any
+ // constants that can act as singletons, so we just
+ // use the given instance
+ return execType;
+ }
+
+ public static ExecType fromString(String execType) throws PigException {
+ Properties properties = PropertiesUtil.loadDefaultProperties();
+ properties.setProperty("exectype", execType);
+ return selectExecType(properties);
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Fri Aug 30 20:04:29 2013
@@ -211,12 +211,7 @@ static int run(String args[], PigProgres
ExecMode mode = ExecMode.UNKNOWN;
String file = null;
- String engine = null;
- ExecType execType = ExecType.MAPREDUCE ;
- String execTypeString = properties.getProperty("exectype");
- if(execTypeString!=null && execTypeString.length()>0){
- execType = ExecType.fromString(execTypeString);
- }
+ String engine = null;
// set up client side system properties in UDF context
UDFContext.getUDFContext().setClientSystemProps(properties);
@@ -328,13 +323,9 @@ static int run(String args[], PigProgres
break;
case 'x':
- try {
- execType = ExecType.fromString(opts.getValStr());
- } catch (IOException e) {
- throw new RuntimeException("ERROR: Unrecognized exectype.", e);
- }
+ properties.setProperty("exectype", opts.getValStr());
break;
-
+
case 'P':
{
InputStream inputStream = null;
@@ -361,8 +352,9 @@ static int run(String args[], PigProgres
}
}
}
+
// create the context with the parameter
- PigContext pigContext = new PigContext(execType, properties);
+ PigContext pigContext = new PigContext(properties);
// create the static script state object
String commandLine = LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
@@ -460,7 +452,6 @@ static int run(String args[], PigProgres
pigContext.getProperties().setProperty(PigContext.JOB_NAME,
"PigLatin:" +new File(file).getName()
);
-
if (!debug) {
new File(substFile).deleteOnExit();
}
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Aug 30 20:04:29 2013
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -54,10 +53,6 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.backend.hadoop.executionengine.HJob;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -125,7 +120,7 @@ public class PigServer {
public static final String PRETTY_PRINT_SCHEMA_PROPERTY = "pig.pretty.print.schema";
private static final String PIG_LOCATION_CHECK_STRICT = "pig.location.check.strict";
- /*
+ /*
* The data structure to support grunt shell operations.
* The grunt shell can only work on one graph at a time.
* If a script is contained inside another script, the grunt
@@ -174,12 +169,23 @@ public class PigServer {
/**
* @param execTypeString can be 'mapreduce' or 'local'. Local mode will
* use Hadoop's local job runner to execute the job on the local machine.
- * Mapreduce mode will connect to a cluster to execute the job.
- * @throws ExecException
+ * Mapreduce mode will connect to a cluster to execute the job. If
+ * execTypeString is not one of these two, Pig will deduce the ExecutionEngine
+ * if it is on the classpath and use it for the backend execution.
+ * @throws ExecException
* @throws IOException
*/
public PigServer(String execTypeString) throws ExecException, IOException {
- this(ExecType.fromString(execTypeString));
+ this(addExecTypeProperty(PropertiesUtil.loadDefaultProperties(), execTypeString));
+ }
+
+ public PigServer(Properties properties) throws ExecException, IOException {
+ this(new PigContext(properties));
+ }
+
+ private static Properties addExecTypeProperty(Properties properties, String execType) {
+ properties.setProperty("exectype", execType);
+ return properties;
}
/**
@@ -992,7 +998,7 @@ public class PigServer {
*/
public void explain(String alias,
PrintStream stream) throws IOException {
- explain(alias, "text", true, false, stream, stream, stream);
+ explain(alias, "text", true, false, stream, stream, null, null);
}
/**
@@ -1006,8 +1012,9 @@ public class PigServer {
* call to execute in the respoect that all the pending stores are
* marked as complete.
* @param lps Stream to print the logical tree
- * @param pps Stream to print the physical tree
- * @param eps Stream to print the execution tree
+ * @param eps Stream to print the ExecutionEngine trees. If null, then will print to files
+ * @param dir Directory to print ExecutionEngine trees. If null, will use eps
+ * @param suffix Suffix of file names
* @throws IOException if the requested alias cannot be found.
*/
@SuppressWarnings("unchecked")
@@ -1016,25 +1023,20 @@ public class PigServer {
boolean verbose,
boolean markAsExecute,
PrintStream lps,
- PrintStream pps,
- PrintStream eps) throws IOException {
+ PrintStream eps,
+ File dir,
+ String suffix) throws IOException {
try {
pigContext.inExplain = true;
buildStorePlan( alias );
if( currDAG.lp.size() == 0 ) {
lps.println("Logical plan is empty.");
- pps.println("Physical plan is empty.");
- eps.println("Execution plan is empty.");
return;
+ } else {
+ currDAG.lp.explain(lps, format, verbose);
}
- PhysicalPlan pp = compilePp();
- currDAG.lp.explain(lps, format, verbose);
-
- pp.explain(pps, format, verbose);
- MapRedUtil.checkLeafIsStore(pp, pigContext);
- MapReduceLauncher launcher = new MapReduceLauncher();
- launcher.explain(pp, pigContext, eps, format, verbose);
+ pigContext.getExecutionEngine().explain(currDAG.lp, pigContext, eps, format, verbose, dir, suffix );
if (markAsExecute) {
currDAG.markAsExecuted();
@@ -1058,7 +1060,7 @@ public class PigServer {
* @throws IOException
*/
public long capacity() throws IOException {
- if (pigContext.getExecType() == ExecType.LOCAL) {
+ if (pigContext.getExecType().isLocal()) {
throw new IOException("capacity only supported for non-local execution");
}
else {
@@ -1290,9 +1292,8 @@ public class PigServer {
private PigStats executeCompiledLogicalPlan() throws ExecException, FrontendException {
// discover pig features used in this script
ScriptState.get().setScriptFeatures( currDAG.lp );
- PhysicalPlan pp = compilePp();
- return launchPlan(pp, "job_pigexec_");
+ return launchPlan(currDAG.lp, "job_pigexec_");
}
/**
@@ -1303,52 +1304,23 @@ public class PigServer {
* @throws ExecException
* @throws FrontendException
*/
- protected PigStats launchPlan(PhysicalPlan pp, String jobName) throws ExecException, FrontendException {
- MapReduceLauncher launcher = new MapReduceLauncher();
+ protected PigStats launchPlan(LogicalPlan lp, String jobName) throws ExecException, FrontendException {
+
PigStats stats = null;
try {
- stats = launcher.launchPig(pp, jobName, pigContext);
+ stats = pigContext.getExecutionEngine().launchPig(lp, jobName, pigContext);
+ } catch (ExecException e) {
+ throw (ExecException) e;
+ } catch (FrontendException e) {
+ throw (FrontendException) e;
} catch (Exception e) {
// There are a lot of exceptions thrown by the launcher. If this
// is an ExecException, just let it through. Else wrap it.
- if (e instanceof ExecException){
- throw (ExecException)e;
- } else if (e instanceof FrontendException) {
- throw (FrontendException)e;
- } else {
- int errCode = 2043;
- String msg = "Unexpected error during execution.";
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- } finally {
- launcher.reset();
+ int errCode = 2043;
+ String msg = "Unexpected error during execution.";
+ throw new ExecException(msg, errCode, PigException.BUG, e);
}
- for (OutputStats output : stats.getOutputStats()) {
- if (!output.isSuccessful()) {
- POStore store = output.getPOStore();
- try {
- store.getStoreFunc().cleanupOnFailure(
- store.getSFile().getFileName(),
- new Job(output.getConf()));
- } catch (IOException e) {
- throw new ExecException(e);
- }
- } else {
- POStore store = output.getPOStore();
- try {
- store.getStoreFunc().cleanupOnSuccess(
- store.getSFile().getFileName(),
- new Job(output.getConf()));
- } catch (IOException e) {
- throw new ExecException(e);
- } catch (AbstractMethodError nsme) {
- // Just swallow it. This means we're running against an
- // older instance of a StoreFunc that doesn't implement
- // this method.
- }
- }
- }
return stats;
}
@@ -1363,11 +1335,6 @@ public class PigServer {
return currDAG.lp;
}
- private PhysicalPlan compilePp() throws FrontendException {
- // translate lp to physical plan
- return pigContext.getExecutionEngine().compile( currDAG.lp, null );
- }
-
private LogicalRelationalOperator getOperatorForAlias(String alias) throws IOException {
buildStorePlan (alias);
LogicalRelationalOperator op = (LogicalRelationalOperator)currDAG.getOperator( alias );
Added: pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (added)
+++ pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.executionengine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Properties;
+
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+/**
+ *
+ * The main interface bridging the front end and back end of Pig. This allows Pig
+ * to be ran on multiple Execution Engines, and not being limited to only Hadoop
+ * MapReduce. The ExecutionEngines must support the following methods as these
+ * are all the access points for the Pig frontend for processing. Traditionally
+ * there is one ExecutionEngine created per processing job, but this is not necessary.
+ * The ExecutionEngine instance comes from the ExecType, and it can choose to reuse
+ * ExecutionEngine instances. All specifications for methods are listed below
+ * as well as expected behavior, and the ExecutionEngine must conform to these.
+ *
+ */
+
+
+public interface ExecutionEngine {
+
+ /**
+ * This method is responsible for the initialization of the ExecutionEngine.
+ * All necessary setup tasks and configuration should execute in the init()
+ * method. This method will be called via the PigContext object.
+ */
+ public void init() throws ExecException;
+
+ /**
+ * Responsible for updating the properties for the ExecutionEngine. The
+ * update may require reinitialization of the engine, perhaps through
+ * another call to init() if appropriate. This decision is delegated to the
+ * ExecutionEngine -- that is, the caller will not call init() after
+ * updating the properties.
+ *
+ * The Properties passed in should replace any configuration that occurred
+ * from previous Properties object. The Properties object should also be
+ * updated to reflect the deprecation/modifications that the ExecutionEngine
+ * may trigger.
+ *
+ * @param newConfiguration -- Properties object holding all configuration vals
+ */
+ public void setConfiguration(Properties newConfiguration)
+ throws ExecException;
+
+ /**
+ * Responsible for setting a specific property and value. This method may be
+ * called as a result of a user "SET" command in the script or elsewhere in
+ * Pig to set certain properties.
+ *
+ * The properties object of the PigContext should be updated with the
+ * property and value with deprecation/other configuration done by the
+ * ExecutionEngine reflected. The ExecutionEngine should also update its
+ * internal configuration view as well.
+ *
+ * @param property to update
+ * @param value to set for property
+ */
+ public void setProperty(String property, String value);
+
+ /**
+ * Returns the Properties representation of the ExecutionEngine
+ * configuration. The Properties object returned does not have to be the
+ * same object between distinct calls to getConfiguration(). The ExecutionEngine
+ * may create a new Properties object populated with all the properties
+ * each time.
+ */
+ public Properties getConfiguration();
+
+ /**
+ * This method is responsible for the actual execution of a LogicalPlan. No
+ * assumptions is made about the architecture of the ExecutionEngine, except
+ * that it is capable of executing the LogicalPlan representation of a
+ * script. The ExecutionEngine should take care of all cleanup after
+ * executing the logical plan and returns an implementation of PigStats that
+ * contains the relevant information/statistics of the execution of the
+ * script.
+ *
+ * @param lp -- plan to compile
+ * @param grpName -- group name for submission
+ * @param pc -- context for execution
+ * @throws ExecException
+ */
+ public PigStats launchPig(LogicalPlan lp, String grpName, PigContext pc)
+ throws FrontendException, ExecException;
+
+ /**
+ * This method handles the backend processing of the Explain command. Once
+ * again, no assumptions is made about the architecture of the
+ * ExecutionEngine, except that it is capable of "explaining" the
+ * LogicalPlan representation of a script. The ExecutionEngine should print
+ * all of it's explain statements in the PrintStream provided UNLESS the
+ * File object is NOT null. In that case, the file is the directory for
+ * which the ExecutionEngine must write out the explain statements into
+ * semantically distinct files. For example, if the ExecutionEngine operates
+ * on a PhysicalPlan and an ExecutionPlan then there would be two separate
+ * files detailing each. The suffix param indicates the suffix of each of
+ * these file names.
+ *
+ * @param lp -- plan to explain
+ * @param pc -- context for explain processing
+ * @param ps -- print stream to write all output to (if dir param is null)
+ * @param format -- format to print explain
+ * @param verbose
+ * @param dir -- directory to write output to. if not null, write to files
+ * @param suffix -- if writing to files, suffix to be used for each file
+ *
+ *
+ * @throws PlanException
+ * @throws VisitorException
+ * @throws IOException
+ */
+ public void explain(LogicalPlan lp, PigContext pc, PrintStream ps,
+ String format, boolean verbose, File dir, String suffix)
+ throws PlanException, VisitorException, IOException;
+
+ /**
+ * Returns the DataStorage the ExecutionEngine is using.
+ *
+ * @return DataStorage the ExecutionEngine is using.
+ */
+ public DataStorage getDataStorage();
+
+ /**
+ * Creates a ScriptState object which will be accessible as a ThreadLocal
+ * variable inside the ScriptState class. This method is called when first
+ * initializing the ScriptState as to delegate to the ExecutionEngine the
+ * version of ScriptState to use to manage the execution at hand.
+ *
+ * @return ScriptState object to manage execution of the script
+ */
+ public ScriptState instantiateScriptState();
+
+ /**
+ * Returns the ExecutableManager to be used in Pig Streaming.
+ *
+ * @return ExecutableManager to be used in Pig Streaming.
+ */
+ public ExecutableManager getExecutableManager();
+
+ /**
+ * This method is called whenever a kill signal has been triggered in the
+ * current process, indicating that the user wishes to kill and exit the
+ * current processing.. It is necessary to implement this method to ensure
+ * that we kill running jobs, and any other necessary cleanup.
+ */
+ public void kill() throws BackendException;
+
+ /**
+ * This method is called when a user requests to kill a job associated with
+ * the given job id. If it is not possible for a user to kill a job, throw a
+ * exception. It is imperative for the job id's being displayed to be unique
+ * such that the correct jobs are being killed when the user supplies the
+ * id.
+ *
+ * @throws BackendException
+ */
+ public void killJob(String jobID) throws BackendException;
+
+}
\ No newline at end of file
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.pigstats.PigStats;
+
+
+/**
+ *
+ * Provides core processing implementation for the backend of Pig
+ * if ExecutionEngine chosen decides to delegate it's work to this class.
+ * Also contains set of utility methods, including ones centered around
+ * Hadoop.
+ *
+ */
+public abstract class Launcher {
+ private static final Log log = LogFactory.getLog(Launcher.class);
+
+ long totalHadoopTimeSpent;
+ String newLine = "\n";
+ boolean pigException = false;
+ boolean outOfMemory = false;
+ static final String OOM_ERR = "OutOfMemoryError";
+
+ protected Launcher() {
+ totalHadoopTimeSpent = 0;
+ // handle the windows portion of \r
+ if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+ newLine = "\r\n";
+ }
+ reset();
+ }
+
+ /**
+ * Resets the state after a launch
+ */
+ public void reset() {
+
+ }
+
+ /**
+ * Method to launch pig for hadoop either for a cluster's job tracker or for
+ * a local job runner. THe only difference between the two is the job
+ * client. Depending on the pig context the job client will be initialize to
+ * one of the two. Launchers for other frameworks can overide these methods.
+ * Given an input PhysicalPlan, it compiles it to get a MapReduce Plan. The
+ * MapReduce plan which has multiple MapReduce operators each one of which
+ * has to be run as a map reduce job with dependency information stored in
+ * the plan. It compiles the MROperPlan into a JobControl object. Each Map
+ * Reduce operator is converted into a Job and added to the JobControl
+ * object. Each Job also has a set of dependent Jobs that are created using
+ * the MROperPlan. The JobControl object is obtained from the
+ * JobControlCompiler Then a new thread is spawned that submits these jobs
+ * while respecting the dependency information. The parent thread monitors
+ * the submitted jobs' progress and after it is complete, stops the
+ * JobControl thread.
+ *
+ * @param php
+ * @param grpName
+ * @param pc
+ * @throws PlanException
+ * @throws VisitorException
+ * @throws IOException
+ * @throws ExecException
+ * @throws JobCreationException
+ */
+ public abstract PigStats launchPig(PhysicalPlan php, String grpName,
+ PigContext pc) throws Exception;
+
+ /**
+ * Explain how a pig job will be executed on the underlying infrastructure.
+ *
+ * @param pp
+ * PhysicalPlan to explain
+ * @param pc
+ * PigContext to use for configuration
+ * @param ps
+ * PrintStream to write output on.
+ * @param format
+ * Format to write in
+ * @param verbose
+ * Amount of information to print
+ * @throws VisitorException
+ * @throws IOException
+ */
+ public abstract void explain(PhysicalPlan pp, PigContext pc,
+ PrintStream ps, String format, boolean verbose)
+ throws PlanException, VisitorException, IOException;
+
+ public abstract void kill() throws BackendException;
+
+ public abstract void killJob(String jobID, JobConf jobConf)
+ throws BackendException;
+
+ protected boolean isComplete(double prog) {
+ return (int) (Math.ceil(prog)) == 1;
+ }
+
+ protected void getStats(Job job, JobClient jobClient, boolean errNotDbg,
+ PigContext pigContext) throws ExecException {
+ JobID MRJobID = job.getAssignedJobID();
+ String jobMessage = job.getMessage();
+ Exception backendException = null;
+ if (MRJobID == null) {
+ try {
+ LogUtils.writeLog(
+ "Backend error message during job submission",
+ jobMessage,
+ pigContext.getProperties().getProperty("pig.logfile"),
+ log);
+ backendException = getExceptionFromString(jobMessage);
+ } catch (Exception e) {
+ int errCode = 2997;
+ String msg = "Unable to recreate exception from backend error: "
+ + jobMessage;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ throw new ExecException(backendException);
+ }
+ try {
+ TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
+ getErrorMessages(mapRep, "map", errNotDbg, pigContext);
+ totalHadoopTimeSpent += computeTimeSpent(mapRep);
+ mapRep = null;
+ TaskReport[] redRep = jobClient.getReduceTaskReports(MRJobID);
+ getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
+ totalHadoopTimeSpent += computeTimeSpent(redRep);
+ redRep = null;
+ } catch (IOException e) {
+ if (job.getState() == Job.SUCCESS) {
+ // if the job succeeded, let the user know that
+ // we were unable to get statistics
+ log.warn("Unable to get job related diagnostics");
+ } else {
+ throw new ExecException(e);
+ }
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ protected long computeTimeSpent(TaskReport[] taskReports) {
+ long timeSpent = 0;
+ for (TaskReport r : taskReports) {
+ timeSpent += (r.getFinishTime() - r.getStartTime());
+ }
+ return timeSpent;
+ }
+
+ protected void getErrorMessages(TaskReport reports[], String type,
+ boolean errNotDbg, PigContext pigContext) throws Exception {
+ for (int i = 0; i < reports.length; i++) {
+ String msgs[] = reports[i].getDiagnostics();
+ ArrayList<Exception> exceptions = new ArrayList<Exception>();
+ String exceptionCreateFailMsg = null;
+ boolean jobFailed = false;
+ float successfulProgress = 1.0f;
+ if (msgs.length > 0) {
+ // if the progress reported is not 1.0f then the map or reduce
+ // job failed
+ // this comparison is in place till Hadoop 0.20 provides methods
+ // to query
+ // job status
+ if (reports[i].getProgress() != successfulProgress) {
+ jobFailed = true;
+ }
+ Set<String> errorMessageSet = new HashSet<String>();
+ for (int j = 0; j < msgs.length; j++) {
+ if (!errorMessageSet.contains(msgs[j])) {
+ errorMessageSet.add(msgs[j]);
+ if (errNotDbg) {
+ // errNotDbg is used only for failed jobs
+ // keep track of all the unique exceptions
+ try {
+ LogUtils.writeLog("Backend error message",
+ msgs[j], pigContext.getProperties()
+ .getProperty("pig.logfile"),
+ log);
+ Exception e = getExceptionFromString(msgs[j]);
+ exceptions.add(e);
+ } catch (Exception e1) {
+ exceptionCreateFailMsg = msgs[j];
+
+ }
+ } else {
+ log.debug("Error message from task (" + type + ") "
+ + reports[i].getTaskID() + msgs[j]);
+ }
+ }
+ }
+ }
+ // if there are no valid exception that could be created, report
+ if ((exceptions.size() == 0) && (exceptionCreateFailMsg != null)) {
+ int errCode = 2997;
+ String msg = "Unable to recreate exception from backed error: "
+ + exceptionCreateFailMsg;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ // if its a failed job then check if there is more than one
+ // exception
+ // more than one exception implies possibly different kinds of
+ // failures
+ // log all the different failures and throw the exception
+ // corresponding
+ // to the first failure
+ if (jobFailed) {
+ if (exceptions.size() > 1) {
+ for (int j = 0; j < exceptions.size(); ++j) {
+ String headerMessage = "Error message from task ("
+ + type + ") " + reports[i].getTaskID();
+ LogUtils.writeLog(exceptions.get(j), pigContext
+ .getProperties().getProperty("pig.logfile"),
+ log, false, headerMessage, false, false);
+ }
+ throw exceptions.get(0);
+ } else if (exceptions.size() == 1) {
+ throw exceptions.get(0);
+ } else {
+ int errCode = 2115;
+ String msg = "Internal error. Expected to throw exception from the backend. Did not find any exception to throw.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+ }
+ }
+
+ /**
+ * Compute the progress of the current job submitted through the JobControl
+ * object jc to the JobClient jobClient
+ *
+ * @param jc
+ * - The JobControl object that has been submitted
+ * @param jobClient
+ * - The JobClient to which it has been submitted
+ * @return The progress as a precentage in double format
+ * @throws IOException
+ */
+ protected double calculateProgress(JobControl jc, JobClient jobClient)
+ throws IOException {
+ double prog = 0.0;
+ prog += jc.getSuccessfulJobs().size();
+
+ List<Job> runnJobs = jc.getRunningJobs();
+ for (Object object : runnJobs) {
+ Job j = (Job) object;
+ prog += progressOfRunningJob(j, jobClient);
+ }
+ return prog;
+ }
+
+ /**
+ * Returns the progress of a Job j which is part of a submitted JobControl
+ * object. The progress is for this Job. So it has to be scaled down by the
+ * num of jobs that are present in the JobControl.
+ *
+ * @param j
+ * - The Job for which progress is required
+ * @param jobClient
+ * - the JobClient to which it has been submitted
+ * @return Returns the percentage progress of this Job
+ * @throws IOException
+ */
+ protected double progressOfRunningJob(Job j, JobClient jobClient)
+ throws IOException {
+ JobID mrJobID = j.getAssignedJobID();
+ RunningJob rj = jobClient.getJob(mrJobID);
+ if (rj == null && j.getState() == Job.SUCCESS)
+ return 1;
+ else if (rj == null)
+ return 0;
+ else {
+ double mapProg = rj.mapProgress();
+ double redProg = rj.reduceProgress();
+ return (mapProg + redProg) / 2;
+ }
+ }
+
+ public long getTotalHadoopTimeSpent() {
+ return totalHadoopTimeSpent;
+ }
+
+ /**
+ *
+ * @param stackTraceLine
+ * The string representation of
+ * {@link Throwable#printStackTrace() printStackTrace} Handles
+ * internal PigException and its subclasses that override the
+ * {@link Throwable#toString() toString} method
+ * @return An exception object whose string representation of
+ * printStackTrace is the input stackTrace
+ * @throws Exception
+ */
+ public Exception getExceptionFromString(String stackTrace) throws Exception {
+ String[] lines = stackTrace.split(newLine);
+ Throwable t = getExceptionFromStrings(lines, 0);
+
+ if (!pigException) {
+ int errCode = 6015;
+ String msg = "During execution, encountered a Hadoop error.";
+ ExecException ee = new ExecException(msg, errCode,
+ PigException.REMOTE_ENVIRONMENT, t);
+ ee.setStackTrace(t.getStackTrace());
+ return ee;
+ } else {
+ pigException = false;
+ if (outOfMemory) {
+ outOfMemory = false;
+ int errCode = 6016;
+ String msg = "Out of memory.";
+ ExecException ee = new ExecException(msg, errCode,
+ PigException.REMOTE_ENVIRONMENT, t);
+ ee.setStackTrace(t.getStackTrace());
+ return ee;
+ }
+ return (Exception) t;
+ }
+ }
+
+ /**
+ *
+ * @param stackTraceLine
+ * An array of strings that represent
+ * {@link Throwable#printStackTrace() printStackTrace} output,
+ * split by newline
+ * @return An exception object whose string representation of
+ * printStackTrace is the input stackTrace
+ * @throws Exception
+ */
+ private Throwable getExceptionFromStrings(String[] stackTraceLines,
+ int startingLineNum) throws Exception {
+ /*
+ * parse the array of string and throw the appropriate exception first:
+ * from the line startingLineNum extract the exception name extract the
+ * message if any fourth: create the appropriate exception and return it
+ * An example of the stack trace:
+ * org.apache.pig.backend.executionengine.ExecException: ERROR 1075:
+ * Received a bytearray from the UDF. Cannot determine how to convert
+ * the bytearray to int. at
+ * org.apache.pig.backend.hadoop.executionengine
+ * .physicalLayer.expressionOperators.POCast.getNext(POCast.java:152) at
+ * org.apache.pig.backend.hadoop.executionengine.physicalLayer.
+ * expressionOperators.LessThanExpr.getNext(LessThanExpr.java:85) at
+ * org.apache.pig.backend.hadoop.executionengine.physicalLayer.
+ * relationalOperators.POFilter.getNext(POFilter.java:148) at
+ * org.apache.
+ * pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase
+ * .runPipeline(PigMapBase.java:184) at
+ * org.apache.pig.backend.hadoop.executionengine
+ * .mapReduceLayer.PigMapBase.map(PigMapBase.java:174) at
+ * org.apache.pig.
+ * backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map
+ * .map(PigMapOnly.java:65) at
+ * org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:47) at
+ * org.apache.hadoop.mapred.MapTask.run(MapTask.java:227) at
+ * org.apache.hadoop
+ * .mapred.TaskTracker$Child.main(TaskTracker.java:2207)
+ */
+
+ if (stackTraceLines.length > 0
+ && startingLineNum < (stackTraceLines.length - 1)) {
+
+ // the regex for matching the exception class name; note the use of
+ // the $ for matching nested classes
+ String exceptionNameDelimiter = "(\\w+(\\$\\w+)?\\.)+\\w+";
+ Pattern exceptionNamePattern = Pattern
+ .compile(exceptionNameDelimiter);
+
+ // from the first line extract the exception name and the exception
+ // message
+ Matcher exceptionNameMatcher = exceptionNamePattern
+ .matcher(stackTraceLines[startingLineNum]);
+ String exceptionName = null;
+ String exceptionMessage = null;
+ if (exceptionNameMatcher.find()) {
+ exceptionName = exceptionNameMatcher.group();
+ /*
+ * note that the substring is from end + 2 the regex matcher
+ * ends at one position beyond the match in this case it will
+ * end at colon (:) the exception message will have a preceding
+ * space (after the colon (:))
+ */
+ if (exceptionName.contains(OOM_ERR)) {
+ outOfMemory = true;
+ }
+
+ if (stackTraceLines[startingLineNum].length() > exceptionNameMatcher
+ .end()) {
+ exceptionMessage = stackTraceLines[startingLineNum]
+ .substring(exceptionNameMatcher.end() + 2);
+ }
+
+ ++startingLineNum;
+ }
+
+ // the exceptionName should not be null
+ if (exceptionName != null) {
+
+ ArrayList<StackTraceElement> stackTraceElements = new ArrayList<StackTraceElement>();
+
+ // Create stack trace elements for the remaining lines
+ String stackElementRegex = "\\s+at\\s+(\\w+(\\$\\w+)?\\.)+(\\<)?\\w+(\\>)?";
+ Pattern stackElementPattern = Pattern
+ .compile(stackElementRegex);
+ String pigExceptionRegex = "org\\.apache\\.pig\\.";
+ Pattern pigExceptionPattern = Pattern
+ .compile(pigExceptionRegex);
+ String moreElementRegex = "\\s+\\.\\.\\.\\s+\\d+\\s+more";
+ Pattern moreElementPattern = Pattern.compile(moreElementRegex);
+
+ int lineNum = startingLineNum;
+ for (; lineNum < (stackTraceLines.length - 1); ++lineNum) {
+ Matcher stackElementMatcher = stackElementPattern
+ .matcher(stackTraceLines[lineNum]);
+
+ if (stackElementMatcher.find()) {
+ StackTraceElement ste = getStackTraceElement(stackTraceLines[lineNum]);
+ stackTraceElements.add(ste);
+ String className = ste.getClassName();
+ Matcher pigExceptionMatcher = pigExceptionPattern
+ .matcher(className);
+ if (pigExceptionMatcher.find()) {
+ pigException = true;
+ }
+ } else {
+ Matcher moreElementMatcher = moreElementPattern
+ .matcher(stackTraceLines[lineNum]);
+ if (moreElementMatcher.find()) {
+ ++lineNum;
+ }
+ break;
+ }
+ }
+
+ startingLineNum = lineNum;
+
+ // create the appropriate exception; setup the stack trace and
+ // message
+ Object object = PigContext
+ .instantiateFuncFromSpec(exceptionName);
+
+ if (object instanceof PigException) {
+ // extract the error code and message the regex for matching
+ // the custom format of ERROR <ERROR CODE>:
+ String errMessageRegex = "ERROR\\s+\\d+:";
+ Pattern errMessagePattern = Pattern
+ .compile(errMessageRegex);
+ Matcher errMessageMatcher = errMessagePattern
+ .matcher(exceptionMessage);
+
+ if (errMessageMatcher.find()) {
+ String errMessageStub = errMessageMatcher.group();
+ /*
+ * extract the actual exception message sans the ERROR
+ * <ERROR CODE>: again note that the matcher ends at the
+ * space following the colon (:) the exception message
+ * appears after the space and hence the end + 1
+ */
+ exceptionMessage = exceptionMessage
+ .substring(errMessageMatcher.end() + 1);
+
+ // the regex to match the error code wich is a string of
+ // numerals
+ String errCodeRegex = "\\d+";
+ Pattern errCodePattern = Pattern.compile(errCodeRegex);
+ Matcher errCodeMatcher = errCodePattern
+ .matcher(errMessageStub);
+
+ String code = null;
+ if (errCodeMatcher.find()) {
+ code = errCodeMatcher.group();
+ }
+
+ // could receive a number format exception here but it
+ // will be propagated up the stack
+ int errCode;
+ if (code != null)
+ errCode = Integer.parseInt(code);
+ else
+ errCode = 2998;
+
+ // create the exception with the message and then set
+ // the error code and error source
+ FuncSpec funcSpec = new FuncSpec(exceptionName,
+ exceptionMessage);
+ object = PigContext.instantiateFuncFromSpec(funcSpec);
+ ((PigException) object).setErrorCode(errCode);
+ ((PigException) object).setErrorSource(PigException
+ .determineErrorSource(errCode));
+ } else { // else for if(errMessageMatcher.find())
+ /*
+ * did not find the error code which means that the
+ * PigException or its subclass is not returning the
+ * error code highly unlikely: should never be here
+ */
+ FuncSpec funcSpec = new FuncSpec(exceptionName,
+ exceptionMessage);
+ object = PigContext.instantiateFuncFromSpec(funcSpec);
+ ((PigException) object).setErrorCode(2997);// generic
+ // error
+ // code
+ ((PigException) object)
+ .setErrorSource(PigException.BUG);
+ }
+ } else { // else for if(object instanceof PigException)
+ // its not PigException; create the exception with the
+ // message
+ object = PigContext.instantiateFuncFromSpec(new FuncSpec(
+ exceptionName, exceptionMessage));
+ }
+
+ StackTraceElement[] steArr = new StackTraceElement[stackTraceElements
+ .size()];
+ ((Throwable) object).setStackTrace(stackTraceElements
+ .toArray(steArr));
+
+ if (startingLineNum < (stackTraceLines.length - 1)) {
+ Throwable e = getExceptionFromStrings(stackTraceLines,
+ startingLineNum);
+ ((Throwable) object).initCause(e);
+ }
+
+ return (Throwable) object;
+ } else { // else for if(exceptionName != null)
+ int errCode = 2055;
+ String msg = "Did not find exception name to create exception from string: "
+ + Arrays.toString(stackTraceLines);
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ } else { // else for if(lines.length > 0)
+ int errCode = 2056;
+ String msg = "Cannot create exception from empty string.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ /**
+ *
+ * @param line
+ * the string representation of a stack trace returned by
+ * {@link Throwable#printStackTrace() printStackTrace}
+ * @return the StackTraceElement object representing the stack trace
+ * @throws Exception
+ */
+ public StackTraceElement getStackTraceElement(String line) throws Exception {
+ /*
+ * the format of the line is something like: at
+ * org.apache.pig.backend.hadoop
+ * .executionengine.mapReduceLayer.PigMapOnly$Map
+ * .map(PigMapOnly.java:65) note the white space before the 'at'. Its
+ * not of much importance but noted for posterity.
+ */
+ String[] items;
+
+ /*
+ * regex for matching the fully qualified method Name note the use of
+ * the $ for matching nested classes and the use of < and > for
+ * constructors
+ */
+ String qualifiedMethodNameRegex = "(\\w+(\\$\\w+)?\\.)+(<)?\\w+(>)?";
+ Pattern qualifiedMethodNamePattern = Pattern
+ .compile(qualifiedMethodNameRegex);
+ Matcher contentMatcher = qualifiedMethodNamePattern.matcher(line);
+
+ // org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map(PigMapOnly.java:65)
+ String content = null;
+ if (contentMatcher.find()) {
+ content = line.substring(contentMatcher.start());
+ } else {
+ int errCode = 2057;
+ String msg = "Did not find fully qualified method name to reconstruct stack trace: "
+ + line;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ Matcher qualifiedMethodNameMatcher = qualifiedMethodNamePattern
+ .matcher(content);
+
+ // org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.map
+ String qualifiedMethodName = null;
+ // (PigMapOnly.java:65)
+ String fileDetails = null;
+
+ if (qualifiedMethodNameMatcher.find()) {
+ qualifiedMethodName = qualifiedMethodNameMatcher.group();
+ fileDetails = content
+ .substring(qualifiedMethodNameMatcher.end() + 1);
+ } else {
+ int errCode = 2057;
+ String msg = "Did not find fully qualified method name to reconstruct stack trace: "
+ + line;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ // From the fully qualified method name, extract the declaring class and
+ // method name
+ items = qualifiedMethodName.split("\\.");
+
+ // initialize the declaringClass (to org in most cases)
+ String declaringClass = items[0];
+ // the last member is always the method name
+ String methodName = items[items.length - 1];
+ StringBuilder sb = new StringBuilder();
+
+ // concatenate the names by adding the dot (.) between the members till
+ // the penultimate member
+ for (int i = 1; i < items.length - 1; ++i) {
+ sb.append('.');
+ sb.append(items[i]);
+ }
+
+ declaringClass += sb.toString();
+
+ // from the file details extract the file name and the line number
+ // PigMapOnly.java:65
+ fileDetails = fileDetails.substring(0, fileDetails.length() - 1);
+ items = fileDetails.split(":");
+ // PigMapOnly.java
+ String fileName = null;
+ int lineNumber = -1;
+ if (items.length > 0) {
+ fileName = items[0];
+ if (items.length > 1) {
+ lineNumber = Integer.parseInt(items[1]);
+ }
+ }
+ return new StackTraceElement(declaringClass, methodName, fileName,
+ lineNumber);
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MRExecutionEngine.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketImplFactory;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigConstants;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
+import org.apache.pig.newplan.logical.optimizer.UidResetter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
+import org.apache.pig.newplan.logical.visitor.SortInfoSetter;
+import org.apache.pig.newplan.logical.visitor.StoreAliasSetter;
+import org.apache.pig.pen.POOptimizeDisabler;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+public class MRExecutionEngine implements ExecutionEngine {
+
+ public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
+ private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+ private static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
+
+ private static final String HADOOP_SITE = "hadoop-site.xml";
+ private static final String CORE_SITE = "core-site.xml";
+ private static final String YARN_SITE = "yarn-site.xml";
+ private final Log log = LogFactory.getLog(getClass());
+ public static final String LOCAL = "local";
+
+ protected PigContext pigContext;
+
+ protected DataStorage ds;
+
+ @SuppressWarnings("deprecation")
+ protected JobConf jobConf;
+
+ // key: the operator key from the logical plan that originated the physical
+ // plan
+ // val: the operator key for the root of the phyisical plan
+ protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
+
+ protected Map<Operator, PhysicalOperator> newLogToPhyMap;
+ private LogicalPlan newPreoptimizedPlan;
+
+ protected Launcher launcher;
+
+ public MRExecutionEngine(PigContext pigContext) {
+ this.pigContext = pigContext;
+ this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
+
+ this.ds = null;
+
+ // to be set in the init method
+ this.jobConf = null;
+
+ this.launcher = new MapReduceLauncher();
+ }
+
+ @SuppressWarnings("deprecation")
+ public JobConf getJobConf() {
+ return this.jobConf;
+ }
+
+ @Override
+ public DataStorage getDataStorage() {
+ return this.ds;
+ }
+
+ @Override
+ public void init() throws ExecException {
+ init(this.pigContext.getProperties());
+ }
+
+ @SuppressWarnings({ "deprecation", "resource" })
+ private void init(Properties properties) throws ExecException {
+ // First set the ssh socket factory
+ setSSHFactory();
+
+ String cluster = null;
+ String nameNode = null;
+
+ // We need to build a configuration object first in the manner described
+ // below
+ // and then get back a properties object to inspect the
+ // JOB_TRACKER_LOCATION
+ // and FILE_SYSTEM_LOCATION. The reason to do this is if we looked only
+ // at
+ // the existing properties object, we may not get the right settings. So
+ // we want
+ // to read the configurations in the order specified below and only then
+ // look
+ // for JOB_TRACKER_LOCATION and FILE_SYSTEM_LOCATION.
+
+ // Hadoop by default specifies two resources, loaded in-order from the
+ // classpath:
+ // 1. hadoop-default.xml : Read-only defaults for hadoop.
+ // 2. hadoop-site.xml: Site-specific configuration for a given hadoop
+ // installation.
+ // Now add the settings from "properties" object to override any
+ // existing properties
+ // All of the above is accomplished in the method call below
+
+ JobConf jc = null;
+ if (!this.pigContext.getExecType().isLocal()) {
+ // Check existence of user provided configs
+ String isHadoopConfigsOverriden = properties
+ .getProperty("pig.use.overriden.hadoop.configs");
+ if (isHadoopConfigsOverriden != null
+ && isHadoopConfigsOverriden.equals("true")) {
+ jc = new JobConf(ConfigurationUtil.toConfiguration(properties));
+ } else {
+ // Check existence of hadoop-site.xml or core-site.xml in
+ // classpath
+ // if user provided confs are not being used
+ Configuration testConf = new Configuration();
+ ClassLoader cl = testConf.getClassLoader();
+ URL hadoop_site = cl.getResource(HADOOP_SITE);
+ URL core_site = cl.getResource(CORE_SITE);
+
+ if (hadoop_site == null && core_site == null) {
+ throw new ExecException(
+ "Cannot find hadoop configurations in classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath)."
+ + " If you plan to use local mode, please put -x local option in command line",
+ 4010);
+ }
+ jc = new JobConf();
+ }
+ jc.addResource("pig-cluster-hadoop-site.xml");
+ jc.addResource(YARN_SITE);
+
+ // Trick to invoke static initializer of DistributedFileSystem to
+ // add hdfs-default.xml
+ // into configuration
+ new DistributedFileSystem();
+
+ // the method below alters the properties object by overriding the
+ // hadoop properties with the values from properties and recomputing
+ // the properties
+ recomputeProperties(jc, properties);
+ } else {
+ // If we are running in local mode we dont read the hadoop conf file
+ if (properties.getProperty("mapreduce.framework.name") == null) {
+ properties.setProperty("mapreduce.framework.name", "local");
+ }
+ properties.setProperty(JOB_TRACKER_LOCATION, LOCAL);
+ properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
+ properties
+ .setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
+
+ jc = new JobConf(false);
+ jc.addResource("core-default.xml");
+ jc.addResource("mapred-default.xml");
+ jc.addResource("yarn-default.xml");
+ recomputeProperties(jc, properties);
+ }
+
+ cluster = jc.get(JOB_TRACKER_LOCATION);
+ nameNode = jc.get(FILE_SYSTEM_LOCATION);
+ if (nameNode == null)
+ nameNode = (String) pigContext.getProperties().get(
+ ALTERNATIVE_FILE_SYSTEM_LOCATION);
+
+ if (cluster != null && cluster.length() > 0) {
+ if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
+ cluster = cluster + ":50020";
+ }
+ properties.setProperty(JOB_TRACKER_LOCATION, cluster);
+ }
+
+ if (nameNode != null && nameNode.length() > 0) {
+ if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
+ nameNode = nameNode + ":8020";
+ }
+ properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
+ }
+
+ log.info("Connecting to hadoop file system at: "
+ + (nameNode == null ? LOCAL : nameNode));
+ // constructor sets DEFAULT_REPLICATION_FACTOR_KEY
+ ds = new HDataStorage(properties);
+
+ if (cluster != null && !cluster.equalsIgnoreCase(LOCAL)) {
+ log.info("Connecting to map-reduce job tracker at: "
+ + jc.get(JOB_TRACKER_LOCATION));
+ }
+
+ // Set job-specific configuration knobs
+ jobConf = jc;
+ }
+
+ @SuppressWarnings("unchecked")
+ public PhysicalPlan compile(LogicalPlan plan, Properties properties)
+ throws FrontendException {
+ if (plan == null) {
+ int errCode = 2041;
+ String msg = "No Plan to compile";
+ throw new FrontendException(msg, errCode, PigException.BUG);
+ }
+
+ newPreoptimizedPlan = new LogicalPlan(plan);
+
+ if (pigContext.inIllustrator) {
+ // disable all PO-specific optimizations
+ POOptimizeDisabler pod = new POOptimizeDisabler(plan);
+ pod.visit();
+ }
+
+ UidResetter uidResetter = new UidResetter(plan);
+ uidResetter.visit();
+
+ SchemaResetter schemaResetter = new SchemaResetter(plan, true /*
+ * skip
+ * duplicate
+ * uid check
+ */);
+ schemaResetter.visit();
+
+ HashSet<String> disabledOptimizerRules;
+ try {
+ disabledOptimizerRules = (HashSet<String>) ObjectSerializer
+ .deserialize(pigContext.getProperties().getProperty(
+ PigImplConstants.PIG_OPTIMIZER_RULES_KEY));
+ } catch (IOException ioe) {
+ int errCode = 2110;
+ String msg = "Unable to deserialize optimizer rules.";
+ throw new FrontendException(msg, errCode, PigException.BUG, ioe);
+ }
+ if (disabledOptimizerRules == null) {
+ disabledOptimizerRules = new HashSet<String>();
+ }
+
+ String pigOptimizerRulesDisabled = this.pigContext.getProperties()
+ .getProperty(PigConstants.PIG_OPTIMIZER_RULES_DISABLED_KEY);
+ if (pigOptimizerRulesDisabled != null) {
+ disabledOptimizerRules.addAll(Lists.newArrayList((Splitter.on(",")
+ .split(pigOptimizerRulesDisabled))));
+ }
+
+ if (pigContext.inIllustrator) {
+ disabledOptimizerRules.add("MergeForEach");
+ disabledOptimizerRules.add("PartitionFilterOptimizer");
+ disabledOptimizerRules.add("LimitOptimizer");
+ disabledOptimizerRules.add("SplitFilter");
+ disabledOptimizerRules.add("PushUpFilter");
+ disabledOptimizerRules.add("MergeFilter");
+ disabledOptimizerRules.add("PushDownForEachFlatten");
+ disabledOptimizerRules.add("ColumnMapKeyPrune");
+ disabledOptimizerRules.add("AddForEach");
+ disabledOptimizerRules.add("GroupByConstParallelSetter");
+ }
+
+ StoreAliasSetter storeAliasSetter = new StoreAliasSetter(plan);
+ storeAliasSetter.visit();
+
+ // run optimizer
+ LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(plan, 100,
+ disabledOptimizerRules);
+ optimizer.optimize();
+
+ // compute whether output data is sorted or not
+ SortInfoSetter sortInfoSetter = new SortInfoSetter(plan);
+ sortInfoSetter.visit();
+
+ if (!pigContext.inExplain) {
+ // Validate input/output file. Currently no validation framework in
+ // new logical plan, put this validator here first.
+ // We might decide to move it out to a validator framework in future
+ InputOutputFileValidator validator = new InputOutputFileValidator(
+ plan, pigContext);
+ validator.validate();
+ }
+
+ // translate new logical plan to physical plan
+ LogToPhyTranslationVisitor translator = new LogToPhyTranslationVisitor(
+ plan);
+
+ translator.setPigContext(pigContext);
+ translator.visit();
+ newLogToPhyMap = translator.getLogToPhyMap();
+ return translator.getPhysicalPlan();
+ }
+
+ public Map<Operator, PhysicalOperator> getLogToPhyMap() {
+ return newLogToPhyMap;
+ }
+
+ public Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap(
+ LogicalPlan plan) {
+ Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> result = new HashMap<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>>();
+ Iterator<Operator> outerIter = plan.getOperators();
+ while (outerIter.hasNext()) {
+ Operator oper = outerIter.next();
+ if (oper instanceof LOForEach) {
+ LogicalPlan innerPlan = ((LOForEach) oper).getInnerPlan();
+ Map<LogicalRelationalOperator, PhysicalOperator> innerOpMap = new HashMap<LogicalRelationalOperator, PhysicalOperator>();
+ Iterator<Operator> innerIter = innerPlan.getOperators();
+ while (innerIter.hasNext()) {
+ Operator innerOper = innerIter.next();
+ innerOpMap.put(((LogicalRelationalOperator) innerOper),
+ newLogToPhyMap.get(innerOper));
+ }
+ result.put((LOForEach) oper, innerOpMap);
+ }
+ }
+ return result;
+ }
+
+ public LogicalPlan getNewPlan() {
+ return newPreoptimizedPlan;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected void setSSHFactory() {
+ Properties properties = this.pigContext.getProperties();
+ String g = properties.getProperty("ssh.gateway");
+ if (g == null || g.length() == 0)
+ return;
+ try {
+ Class clazz = Class
+ .forName("org.apache.pig.shock.SSHSocketImplFactory");
+ SocketImplFactory f = (SocketImplFactory) clazz.getMethod(
+ "getFactory", new Class[0]).invoke(0, new Object[0]);
+ Socket.setSocketImplFactory(f);
+ } catch (SocketException e) {
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Method to apply pig properties to JobConf (replaces properties with
+ * resulting jobConf values)
+ *
+ * @param conf
+ * JobConf with appropriate hadoop resource files
+ * @param properties
+ * Pig properties that will override hadoop properties;
+ * properties might be modified
+ */
+ @SuppressWarnings("deprecation")
+ protected void recomputeProperties(JobConf jobConf, Properties properties) {
+ // We need to load the properties from the hadoop configuration
+ // We want to override these with any existing properties we have.
+ if (jobConf != null && properties != null) {
+ // set user properties on the jobConf to ensure that defaults
+ // and deprecation is applied correctly
+ Enumeration<Object> propertiesIter = properties.keys();
+ while (propertiesIter.hasMoreElements()) {
+ String key = (String) propertiesIter.nextElement();
+ String val = properties.getProperty(key);
+ // We do not put user.name, See PIG-1419
+ if (!key.equals("user.name"))
+ jobConf.set(key, val);
+ }
+ // clear user defined properties and re-populate
+ properties.clear();
+ Iterator<Map.Entry<String, String>> iter = jobConf.iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> entry = iter.next();
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ public ScriptState instantiateScriptState() {
+ return new MRScriptState(UUID.randomUUID().toString());
+ }
+
+ @Override
+ public PigStats launchPig(LogicalPlan lp, String grpName, PigContext pc)
+ throws FrontendException, ExecException {
+
+ try {
+ PhysicalPlan pp = compile(lp, pc.getProperties());
+ return launcher.launchPig(pp, grpName, pigContext);
+ } catch (ExecException e) {
+ throw (ExecException) e;
+ } catch (FrontendException e) {
+ throw (FrontendException) e;
+ } catch (Exception e) {
+ throw new ExecException(e);
+ } finally {
+ launcher.reset();
+ }
+ }
+
+ @Override
+ public void explain(LogicalPlan lp, PigContext pc, PrintStream ps,
+ String format, boolean verbose, File file, String suffix)
+ throws PlanException, VisitorException, IOException,
+ FrontendException {
+
+ PrintStream pps = ps;
+ PrintStream eps = ps;
+
+ try {
+
+ if (file != null) {
+ pps = new PrintStream(new File(file, "physical_plan-" + suffix));
+ eps = new PrintStream(new File(file, "exec_plan-" + suffix));
+ }
+
+ PhysicalPlan pp = compile(lp, pc.getProperties());
+ pp.explain(pps, format, verbose);
+
+ MapRedUtil.checkLeafIsStore(pp, pigContext);
+ launcher.explain(pp, pigContext, eps, format, verbose);
+ } finally {
+ launcher.reset();
+ pps.close();
+ eps.close();
+ }
+ }
+
+ @Override
+ public Properties getConfiguration() {
+ if (jobConf == null)
+ return null;
+ return ConfigurationUtil.toProperties(jobConf);
+ }
+
+ @Override
+ public void setConfiguration(Properties newConfiguration)
+ throws ExecException {
+ init(newConfiguration);
+ }
+
+ @Override
+ public void setProperty(String property, String value) {
+ // mPigServer.getPigContext().getProperties().setProperty(key, value);
+ // PIG-2508 properties need to be managed through JobConf
+ // since all other code depends on access to properties,
+ // we need to re-populate from updated JobConf
+ // java.util.HashSet<?> keysBefore = new
+ // java.util.HashSet<Object>(mPigServer.getPigContext().getProperties().keySet());
+ // set current properties on jobConf
+ Properties properties = pigContext.getProperties();
+ Enumeration<Object> propertiesIter = pigContext.getProperties().keys();
+ while (propertiesIter.hasMoreElements()) {
+ String pkey = (String) propertiesIter.nextElement();
+ String val = properties.getProperty(pkey);
+ // We do not put user.name, See PIG-1419
+ if (!pkey.equals("user.name"))
+ jobConf.set(pkey, val);
+ }
+ // set new value, JobConf will handle deprecation etc.
+ jobConf.set(property, value);
+ // re-initialize to reflect updated JobConf
+ properties.clear();
+ Iterator<Map.Entry<String, String>> iter = jobConf.iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> entry = iter.next();
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ // keysBefore.removeAll(mPigServer.getPigContext().getProperties().keySet());
+ // log.info("PIG-2508: keys dropped from properties: " + keysBefore);
+ }
+
+ @Override
+ public ExecutableManager getExecutableManager() {
+ return new HadoopExecutableManager();
+ }
+
+ @Override
+ public void kill() throws BackendException {
+ if (launcher != null) {
+ launcher.kill();
+ }
+ }
+
+ @Override
+ public void killJob(String jobID) throws BackendException {
+ if (launcher != null) {
+ launcher.killJob(jobID, jobConf);
+ }
+ }
+
+}