You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/04/23 18:57:20 UTC
svn commit: r767974 [1/4] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/executionengine/util/
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/hadoop/execu...
Author: pradeepkth
Date: Thu Apr 23 16:57:16 2009
New Revision: 767974
URL: http://svn.apache.org/viewvc?rev=767974&view=rev
Log:
PIG-627: multiquery support phase 1 and phase 2 (hagleitn and Richard Ding via pradeepkth)
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/DotPlanDumper.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanDumper.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/PlanDumper.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
hadoop/pig/trunk/test/org/apache/pig/test/data/passwd
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/passwd
hadoop/pig/trunk/test/org/apache/pig/test/data/passwd2
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/passwd2
hadoop/pig/trunk/test/org/apache/pig/test/data/test.ppf
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/test.ppf
hadoop/pig/trunk/test/org/apache/pig/test/data/test_broken.ppf
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/test_broken.ppf
hadoop/pig/trunk/test/org/apache/pig/test/data/testsub.pig
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsub.pig
hadoop/pig/trunk/test/org/apache/pig/test/data/testsubnested_exec.pig
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsubnested_exec.pig
hadoop/pig/trunk/test/org/apache/pig/test/data/testsubnested_run.pig
- copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsubnested_run.pig
Removed:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
Modified:
hadoop/pig/trunk/ (props changed)
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/Main.java
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
hadoop/pig/trunk/test/org/apache/pig/test/MiniCluster.java
hadoop/pig/trunk/test/org/apache/pig/test/RangeSlicer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestFilterOpString.java
hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLocalJobSubmission.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigScriptParser.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC5.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld
hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt (props changed)
Propchange: hadoop/pig/trunk/
------------------------------------------------------------------------------
svn:mergeinfo = /hadoop/pig/branches/multiquery:741727-767731
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Apr 23 16:57:16 2009
@@ -595,3 +595,4 @@
PIG-284: target for building source jar (oae via olgan)
+ PIG-627: multiquery support phase 1 and phase 2 (hagleitn and Richard Ding via pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Thu Apr 23 16:57:16 2009
@@ -99,6 +99,7 @@
opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
opts.registerOpt('o', "hod", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
+ opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.OPTIONAL);
opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('t', "optimizer_off", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
@@ -121,6 +122,8 @@
//by default warning aggregation is on
properties.setProperty("aggregate.warning", ""+true);
+ properties.setProperty("opt.multiquery", ""+true);
+
char opt;
while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
switch (opt) {
@@ -191,6 +194,11 @@
case 'm':
paramFiles.add(opts.getValStr());
break;
+
+ case 'M':
+ // turns off multiquery optimization
+ properties.setProperty("opt.multiquery",""+false);
+ break;
case 'o':
// TODO sgroschupf using system properties is always a very bad idea
@@ -510,6 +518,7 @@
System.out.println(" -w, -warning turn warning on; also turns warning aggregation off");
System.out.println(" -x, -exectype local|mapreduce, mapreduce is default");
+ System.out.println(" -M, -no_multiquery turn multiquery optimization off; Multiquery is on by default");
}
private static String validateLogFile(String logFileName, String scriptName) {
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Apr 23 16:57:16 2009
@@ -34,7 +34,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-
+import java.util.Stack;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -50,10 +51,12 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.impl.logicalLayer.LOPrinter;
import org.apache.pig.impl.logicalLayer.PlanSetter;
import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -65,6 +68,7 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.logicalLayer.LODefine;
@@ -99,24 +103,42 @@
throw new PigException(msg, errCode, PigException.BUG);
}
-
- Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
- Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
- Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
- PigContext pigContext;
+ /*
+ * The data structure to support grunt shell operations.
+ * The grunt shell can only work on one graph at a time.
+ * If a script is contained inside another script, the grunt
+ * shell first saves the current graph on the stack and works
+ * on a new graph. After the nested script is done, the grunt
+ * shell pops up the saved graph and continues working on it.
+ */
+ private Stack<Graph> graphs = new Stack<Graph>();
+ /*
+ * The current Graph the grunt shell is working on.
+ */
+ private Graph currDAG;
+
+ private PigContext pigContext;
+
+ private static int scopeCounter = 0;
private String scope = constructScope();
+
private ArrayList<String> cachedScript = new ArrayList<String>();
private boolean aggregateWarning = true;
+ private boolean isMultiQuery = true;
private String constructScope() {
// scope servers for now as a session id
- // scope = user_id + "-" + time_stamp;
- String user = System.getProperty("user.name", "DEFAULT_USER_ID");
- String date = (new Date()).toString();
-
- return user + "-" + date;
+ // String user = System.getProperty("user.name", "DEFAULT_USER_ID");
+ // String date = (new Date()).toString();
+
+ // scope is not really used in the system right now. It will
+ // however make your explain statements look lengthy if set to
+ // username-date. For now let's simplify the scope, if a real
+ // scope is needed again, we might need to update all the
+ // operators to not include scope in their name().
+ return ""+(++scopeCounter);
}
public PigServer(String execTypeString) throws ExecException, IOException {
@@ -128,7 +150,7 @@
}
public PigServer(ExecType execType, Properties properties) throws ExecException {
- this(new PigContext(execType, properties), true);
+ this(new PigContext(execType, properties));
}
public PigServer(PigContext context) throws ExecException {
@@ -137,17 +159,16 @@
public PigServer(PigContext context, boolean connect) throws ExecException {
this.pigContext = context;
- if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
- setJobName("DefaultJobName") ;
- }
+ currDAG = new Graph(false);
aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+ isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
+
if (connect) {
pigContext.connect();
}
}
-
+
public PigContext getPigContext(){
return pigContext;
}
@@ -159,7 +180,84 @@
public void debugOff() {
pigContext.debug = false;
}
-
+
+ /**
+ * Starts batch execution mode.
+ */
+ public void setBatchOn() {
+ log.info("Create a new graph.");
+
+ if (currDAG != null) {
+ graphs.push(currDAG);
+ }
+ currDAG = new Graph(isMultiQuery);
+ }
+
+ /**
+ * Retrieve the current execution mode.
+ *
+ * @return true if the execution mode is batch; false otherwise.
+ */
+ public boolean isBatchOn() {
+ // Batch is on when there are multiple graphs on the
+ // stack. That gives the right response even if multiquery was
+ // turned off.
+ return graphs.size() > 0;
+ }
+
+ /**
+ * Returns whether there is anything to process in the current batch.
+ * @throws FrontendException
+ * @return true if there are no stores to process in the current
+ * batch, false otherwise.
+ */
+ public boolean isBatchEmpty() throws FrontendException {
+ if (currDAG == null) {
+ int errCode = 1083;
+ String msg = "setBatchOn() must be called first.";
+ throw new FrontendException(msg, errCode, PigException.INPUT);
+ }
+
+ return currDAG.isBatchEmpty();
+ }
+
+ /**
+ * Submits a batch of Pig commands for execution.
+ *
+ * @throws FrontendException
+ * @throws ExecException
+ */
+ public void executeBatch() throws FrontendException, ExecException {
+ if (!isMultiQuery) {
+ // ignore if multiquery is off
+ return;
+ }
+
+ if (currDAG == null || !isBatchOn()) {
+ int errCode = 1083;
+ String msg = "setBatchOn() must be called first.";
+ throw new FrontendException(msg, errCode, PigException.INPUT);
+ }
+
+ currDAG.execute();
+ }
+
+ /**
+ * Discards a batch of Pig commands.
+ *
+ * @throws FrontendException
+ * @throws ExecException
+ */
+ public void discardBatch() throws FrontendException {
+ if (currDAG == null || !isBatchOn()) {
+ int errCode = 1083;
+ String msg = "setBatchOn() must be called first.";
+ throw new FrontendException(msg, errCode, PigException.INPUT);
+ }
+
+ currDAG = graphs.pop();
+ }
+
/**
* Add a path to be skipped while automatically shipping binaries for
* streaming.
@@ -274,99 +372,20 @@
* line number of the query within the whold script
* @throws IOException
*/
- public void registerQuery(String query, int startLine) throws IOException {
-
- LogicalPlan lp = parseQuery(query, startLine, aliases, opTable, aliasOp);
- // store away the query for use in cloning later
- cachedScript .add(query);
-
- if (lp.getLeaves().size() == 1)
- {
- LogicalOperator op = lp.getSingleLeafPlanOutputOp();
- // No need to do anything about DEFINE
- if (op instanceof LODefine) {
- return;
- }
-
- // Check if we just processed a LOStore i.e. STORE
- if (op instanceof LOStore) {
- try{
- execute(null);
- } catch (Exception e) {
- int errCode = 1002;
- String msg = "Unable to store alias " + op.getOperatorKey().getId();
- throw new FrontendException(msg, errCode, PigException.INPUT, e);
- }
- }
- }
+ public void registerQuery(String query, int startLine) throws IOException {
+ currDAG.registerQuery(query, startLine);
}
-
- private LogicalPlan parseQuery(String query, int startLine, Map<LogicalOperator, LogicalPlan> aliasesMap,
- Map<OperatorKey, LogicalOperator> opTableMap, Map<String, LogicalOperator> aliasOpMap) throws IOException {
- if(query != null) {
- query = query.trim();
- if(query.length() == 0) return null;
- }else {
- return null;
- }
- try {
- return new LogicalPlanBuilder(pigContext).parse(scope, query,
- aliasesMap, opTableMap, aliasOpMap, startLine);
- } catch (ParseException e) {
- //throw (IOException) new IOException(e.getMessage()).initCause(e);
- PigException pe = LogUtils.getPigException(e);
- int errCode = 1000;
- String msg = "Error during parsing. " + (pe == null? e.getMessage() : pe.getMessage());
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
- }
- }
-
+
public LogicalPlan clonePlan(String alias) throws IOException {
- // There are two choices on how we clone the logical plan
- // 1 - we really clone each operator and connect up the cloned operators
- // 2 - we cache away the script till the point we need to clone
- // and then simply re-parse the script.
- // The latter approach is used here
- // FIXME: There is one open issue with this now:
- // Consider the following script:
- // A = load 'file:/somefile';
- // B = filter A by $0 > 10;
- // store B into 'bla';
- // rm 'file:/somefile';
- // A = load 'file:/someotherfile'
- // when we try to clone - we try to reparse
- // from the beginning and currently the parser
- // checks for file existence of files in the load
- // in the case where the file is a local one -i.e. with file: prefix
- // This will be a known issue now and we will need to revisit later
-
- // parse each line of the cached script and the
- // final logical plan is the clone that we want
- LogicalPlan lp = null;
- int lineNumber = 1;
- // create data structures needed for parsing
- Map<LogicalOperator, LogicalPlan> cloneAliases = new HashMap<LogicalOperator, LogicalPlan>();
- Map<OperatorKey, LogicalOperator> cloneOpTable = new HashMap<OperatorKey, LogicalOperator>();
- Map<String, LogicalOperator> cloneAliasOp = new HashMap<String, LogicalOperator>();
- for (Iterator<String> it = cachedScript.iterator(); it.hasNext(); lineNumber++) {
- lp = parseQuery(it.next(), lineNumber, cloneAliases, cloneOpTable, cloneAliasOp);
- }
-
- if(alias == null) {
- // a store prompted the execution - so return
- // the entire logical plan
- return lp;
- } else {
- // return the logical plan corresponding to the
- // alias supplied
- LogicalOperator op = cloneAliasOp.get(alias);
- if(op == null) {
- int errCode = 1003;
- String msg = "Unable to find an operator for alias " + alias;
- throw new FrontendException(msg, errCode, PigException.INPUT);
- }
- return cloneAliases.get(op);
+ Graph graph = currDAG.clone();
+
+ if (graph == null) {
+ int errCode = 2127;
+ String msg = "Cloning of plan failed.";
+ throw new FrontendException(msg, errCode, PigException.BUG);
}
+
+ return graph.getPlan(alias);
}
public void registerQuery(String query) throws IOException {
@@ -378,7 +397,7 @@
GruntParser grunt = new GruntParser(new FileReader(new File(fileName)));
grunt.setInteractive(false);
grunt.setParams(this);
- grunt.parseStopOnError();
+ grunt.parseStopOnError(true);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -391,7 +410,7 @@
}
public void printAliases () throws FrontendException {
- System.out.println("aliases: " + aliasOp.keySet());
+ System.out.println("aliases: " + currDAG.getAliasOp().keySet());
}
public Schema dumpSchema(String alias) throws IOException{
@@ -410,7 +429,7 @@
}
public void setJobName(String name){
- pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + name);
+ currDAG.setJobName(name);
}
/**
@@ -419,14 +438,15 @@
*/
public Iterator<Tuple> openIterator(String id) throws IOException {
try {
- LogicalOperator op = aliasOp.get(id);
+ LogicalOperator op = currDAG.getAliasOp().get(id);
if(null == op) {
int errCode = 1003;
String msg = "Unable to find an operator for alias " + id;
throw new FrontendException(msg, errCode, PigException.INPUT);
}
-// ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
+
ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
+
// invocation of "execute" is synchronous!
if (job.getStatus() == JOB_STATUS.COMPLETED) {
@@ -460,7 +480,7 @@
String id,
String filename,
String func) throws IOException{
- if (!aliasOp.containsKey(id))
+ if (!currDAG.getAliasOp().containsKey(id))
throw new IOException("Invalid alias: " + id);
try {
@@ -517,42 +537,51 @@
*/
public void explain(String alias,
PrintStream stream) throws IOException {
+ explain(alias, "text", true, false, stream, stream, stream);
+ }
+
+ /**
+ * Provide information on how a pig query will be executed.
+ * @param alias Name of alias to explain.
+ * @param format Format in which the explain should be printed
+ * @param verbose Controls the amount of information printed
+ * @param markAsExecute When set will treat the explain like a
+ * call to execute in the respoect that all the pending stores are
+ * marked as complete.
+ * @param lps Stream to print the logical tree
+ * @param pps Stream to print the physical tree
+ * @param eps Stream to print the execution tree
+ * @throws IOException if the requested alias cannot be found.
+ */
+ public void explain(String alias,
+ String format,
+ boolean verbose,
+ boolean markAsExecute,
+ PrintStream lps,
+ PrintStream pps,
+ PrintStream eps) throws IOException {
try {
- LogicalPlan lp = compileLp(alias);
-
- // MRCompiler needs a store to be the leaf - hence
- // add a store to the plan to explain
-
- // figure out the leaf to which the store needs to be added
- List<LogicalOperator> leaves = lp.getLeaves();
- LogicalOperator leaf = null;
- if(leaves.size() == 1) {
- leaf = leaves.get(0);
- } else {
- for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
- LogicalOperator leafOp = it.next();
- if(leafOp.getAlias().equals(alias))
- leaf = leafOp;
- }
+ pigContext.inExplain = true;
+ LogicalPlan lp = getStorePlan(alias);
+ if (lp.size() == 0) {
+ lps.println("Logical plan is empty.");
+ pps.println("Physical plan is empty.");
+ eps.println("Execution plan is empty.");
+ return;
+ }
+ PhysicalPlan pp = compilePp(lp);
+ lp.explain(lps, format, verbose);
+ pp.explain(pps, format, verbose);
+ pigContext.getExecutionEngine().explain(pp, eps, format, verbose);
+ if (markAsExecute) {
+ currDAG.markAsExecuted();
}
-
- LogicalPlan storePlan = QueryParser.generateStorePlan(
- scope, lp, "fakefile", PigStorage.class.getName(), leaf);
- stream.println("Logical Plan:");
- LOPrinter lv = new LOPrinter(stream, storePlan);
- lv.visit();
-
- PhysicalPlan pp = compilePp(storePlan);
- stream.println("-----------------------------------------------");
- stream.println("Physical Plan:");
-
- stream.println("-----------------------------------------------");
- pigContext.getExecutionEngine().explain(pp, stream);
-
} catch (Exception e) {
int errCode = 1067;
String msg = "Unable to explain alias " + alias;
throw new FrontendException(msg, errCode, PigException.INPUT, e);
+ } finally {
+ pigContext.inExplain = false;
}
}
@@ -648,10 +677,10 @@
public Map<String, LogicalPlan> getAliases() {
Map<String, LogicalPlan> aliasPlans = new HashMap<String, LogicalPlan>();
- for(LogicalOperator op: this.aliases.keySet()) {
+ for(LogicalOperator op: currDAG.getAliases().keySet()) {
String alias = op.getAlias();
if(null != alias) {
- aliasPlans.put(alias, this.aliases.get(op));
+ aliasPlans.put(alias, currDAG.getAliases().get(op));
}
}
return aliasPlans;
@@ -667,11 +696,10 @@
}
public Set<String> getAliasKeySet() {
- return aliasOp.keySet();
+ return currDAG.getAliasOp().keySet();
}
public Map<LogicalOperator, DataBag> getExamples(String alias) {
- //LogicalPlan plan = aliases.get(aliasOp.get(alias));
LogicalPlan plan = null;
try {
plan = clonePlan(alias);
@@ -684,15 +712,48 @@
ExampleGenerator exgen = new ExampleGenerator(plan, pigContext);
return exgen.getExamples();
}
+
+ private LogicalPlan getStorePlan(String alias) throws IOException {
+ LogicalPlan lp = compileLp(alias);
+
+ if (!isBatchOn() || alias != null) {
+ // MRCompiler needs a store to be the leaf - hence
+ // add a store to the plan to explain
+
+ // figure out the leaves to which stores need to be added
+ List<LogicalOperator> leaves = lp.getLeaves();
+ LogicalOperator leaf = null;
+ if(leaves.size() == 1) {
+ leaf = leaves.get(0);
+ } else {
+ for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
+ LogicalOperator leafOp = it.next();
+ if(leafOp.getAlias().equals(alias))
+ leaf = leafOp;
+ }
+ }
+
+ lp = QueryParser.generateStorePlan(scope, lp, "fakefile",
+ PigStorage.class.getName(), leaf);
+ }
+
+ return lp;
+ }
private ExecJob execute(String alias) throws FrontendException, ExecException {
- ExecJob job = null;
-// lp.explain(System.out, System.err);
LogicalPlan typeCheckedLp = compileLp(alias);
-
+
+ if (typeCheckedLp.size() == 0) {
+ return null;
+ }
+
+ LogicalOperator op = typeCheckedLp.getLeaves().get(0);
+ if (op instanceof LODefine) {
+ log.info("Skip execution of DEFINE only logical plan.");
+ return null;
+ }
+
return executeCompiledLogicalPlan(typeCheckedLp);
-// typeCheckedLp.explain(System.out, System.err);
-
}
private ExecJob executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
@@ -717,14 +778,14 @@
// create a clone of the logical plan and give it
// to the operations below
LogicalPlan lpClone;
+
try {
- lpClone = clonePlan(alias);
+ lpClone = clonePlan(alias);
} catch (IOException e) {
int errCode = 2001;
String msg = "Unable to clone plan before compiling";
throw new FrontendException(msg, errCode, PigException.BUG, e);
}
-
// Set the logical plan values correctly in all the operators
PlanSetter ps = new PlanSetter(lpClone);
@@ -788,13 +849,13 @@
private LogicalPlan getPlanFromAlias(
String alias,
String operation) throws FrontendException {
- LogicalOperator lo = aliasOp.get(alias);
+ LogicalOperator lo = currDAG.getAliasOp().get(alias);
if (lo == null) {
int errCode = 1004;
String msg = "No alias " + alias + " to " + operation;
throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
}
- LogicalPlan lp = aliases.get(lo);
+ LogicalPlan lp = currDAG.getAliases().get(lo);
if (lp == null) {
int errCode = 1005;
String msg = "No plan for " + alias + " to " + operation;
@@ -803,5 +864,266 @@
return lp;
}
+ /*
+ * This class holds the internal states of a grunt shell session.
+ */
+ private class Graph {
+
+ private Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+
+ private Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
+
+ private Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+
+ private List<String> scriptCache = new ArrayList<String>();
+
+ // the fileNameMap contains filename to canonical filename
+ // mappings. This is done so we can reparse the cached script
+ // and remember the translation (current directory might only
+ // be correct during the first parse
+ private Map<String, String> fileNameMap = new HashMap<String, String>();
+
+ private Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore, LogicalPlan>();
+
+ private Set<LOLoad> loadOps = new HashSet<LOLoad>();
+
+ private String jobName;
+
+ private boolean batchMode;
+ private int processedStores;
+
+ private int ignoreNumStores;
+
+ private LogicalPlan lp;
+
+ Graph(boolean batchMode) {
+ this.batchMode = batchMode;
+ this.processedStores = 0;
+ this.ignoreNumStores = 0;
+ this.jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME,
+ PigContext.JOB_NAME_PREFIX+":DefaultJobName");
+ this.lp = new LogicalPlan();
+ };
+
+ Map<LogicalOperator, LogicalPlan> getAliases() { return aliases; }
+
+ Map<OperatorKey, LogicalOperator> getOpTable() { return opTable; }
+
+ Map<String, LogicalOperator> getAliasOp() { return aliasOp; }
+
+ List<String> getScriptCache() { return scriptCache; }
+
+ boolean isBatchOn() { return batchMode; };
+
+ boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
+
+ void execute() throws ExecException, FrontendException {
+ pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
+ PigServer.this.execute(null);
+ processedStores = storeOpTable.keySet().size();
+ }
+
+ void markAsExecuted() {
+ processedStores = storeOpTable.keySet().size();
+ }
+
+ void setJobName(String name) {
+ jobName = PigContext.JOB_NAME_PREFIX+":"+name;
+ }
+
+ LogicalPlan getPlan(String alias) throws IOException {
+ LogicalPlan plan = lp;
+
+ if (alias != null) {
+ LogicalOperator op = aliasOp.get(alias);
+ if(op == null) {
+ int errCode = 1003;
+ String msg = "Unable to find an operator for alias " + alias;
+ throw new FrontendException(msg, errCode, PigException.INPUT);
+ }
+ plan = aliases.get(op);
+ }
+ return plan;
+ }
+
+ void registerQuery(String query, int startLine) throws IOException {
+
+ LogicalPlan tmpLp = parseQuery(query, startLine);
+
+ // store away the query for use in cloning later
+ scriptCache.add(query);
+ if (tmpLp.getLeaves().size() == 1) {
+ LogicalOperator op = tmpLp.getSingleLeafPlanOutputOp();
+
+ // Check if we just processed a LOStore i.e. STORE
+ if (op instanceof LOStore) {
+
+ if (!batchMode) {
+ lp = tmpLp;
+ try {
+ execute();
+ } catch (Exception e) {
+ int errCode = 1002;
+ String msg = "Unable to store alias "
+ + op.getOperatorKey().getId();
+ throw new FrontendException(msg, errCode,
+ PigException.INPUT, e);
+ }
+ } else {
+ if (0 == ignoreNumStores) {
+ storeOpTable.put((LOStore)op, tmpLp);
+ lp.mergeSharedPlan(tmpLp);
+ List<LogicalOperator> roots = tmpLp.getRoots();
+ for (LogicalOperator root : roots) {
+ if (root instanceof LOLoad) {
+ loadOps.add((LOLoad)root);
+ }
+ }
+
+ } else {
+ --ignoreNumStores;
+ }
+ }
+ }
+ }
+ }
+
+ LogicalPlan parseQuery(String query, int startLine) throws IOException {
+ if (query == null || query.length() == 0) {
+ int errCode = 1084;
+ String msg = "Invalid Query: Query is null or of size 0";
+ throw new FrontendException(msg, errCode, PigException.INPUT);
+ }
+
+ query = query.trim();
+
+ try {
+ return new LogicalPlanBuilder(PigServer.this.pigContext).parse(scope, query,
+ aliases, opTable, aliasOp, startLine, fileNameMap);
+ } catch (ParseException e) {
+ PigException pe = LogUtils.getPigException(e);
+ int errCode = 1000;
+ String msg = "Error during parsing. " + (pe == null? e.getMessage() : pe.getMessage());
+ throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
+ }
+ }
+
+ protected Graph clone() {
+ // There are two choices on how we clone the logical plan
+ // 1 - we really clone each operator and connect up the cloned operators
+ // 2 - we cache away the script till the point we need to clone
+ // and then simply re-parse the script.
+ // The latter approach is used here
+ // FIXME: There is one open issue with this now:
+ // Consider the following script:
+ // A = load 'file:/somefile';
+ // B = filter A by $0 > 10;
+ // store B into 'bla';
+ // rm 'file:/somefile';
+ // A = load 'file:/someotherfile'
+ // when we try to clone - we try to reparse
+ // from the beginning and currently the parser
+ // checks for file existence of files in the load
+ // in the case where the file is a local one -i.e. with file: prefix
+ // This will be a known issue now and we will need to revisit later
+
+ // parse each line of the cached script
+ int lineNumber = 1;
+
+ // create data structures needed for parsing
+ Graph graph = new Graph(isBatchOn());
+ graph.ignoreNumStores = processedStores;
+ graph.processedStores = processedStores;
+ graph.fileNameMap = fileNameMap;
+
+ try {
+ for (Iterator<String> it = getScriptCache().iterator(); it.hasNext(); lineNumber++) {
+ if (isBatchOn()) {
+ graph.registerQuery(it.next(), lineNumber);
+ } else {
+ graph.lp = graph.parseQuery(it.next(), lineNumber);
+ }
+ }
+ graph.postProcess();
+ } catch (IOException ioe) {
+ graph = null;
+ }
+ return graph;
+ }
+
+ private void postProcess() throws IOException {
+
+ // Set the logical plan values correctly in all the operators
+ PlanSetter ps = new PlanSetter(lp);
+ ps.visit();
+
+ // The following code deals with store/load combination of
+ // intermediate files. In this case we will replace the load operator
+ // with a (implicit) split operator, iff the load/store
+ // func is reversible (because that's when we can safely
+ // skip the load and keep going with the split output). If
+ // the load/store func is not reversible (or they are
+ // different functions), we connect the store and the load
+ // to remember the dependency.
+ for (LOLoad load : loadOps) {
+ for (LOStore store : storeOpTable.keySet()) {
+ String ifile = load.getInputFile().getFileName();
+ String ofile = store.getOutputFile().getFileName();
+ if (ofile.compareTo(ifile) == 0) {
+ LoadFunc lFunc = (LoadFunc) pigContext.instantiateFuncFromSpec(load.getInputFile().getFuncSpec());
+ StoreFunc sFunc = (StoreFunc) pigContext.instantiateFuncFromSpec(store.getOutputFile().getFuncSpec());
+ if (lFunc.getClass() == sFunc.getClass() && lFunc instanceof ReversibleLoadStoreFunc) {
+
+ // In this case we remember the input file
+ // spec in the store. We might have to use it
+ // in the MR compiler to recreate the load, if
+ // the store happens on a job boundary.
+ store.setInputSpec(load.getInputFile());
+
+ LogicalOperator storePred = lp.getPredecessors(store).get(0);
+
+ // In this case we remember the input file
+ // spec in the store. We might have to use it
+ // in the MR compiler to recreate the load, if
+ // the store happens on a job boundary.
+ store.setInputSpec(load.getInputFile());
+
+ lp.disconnect(store, load);
+ lp.replace(load, storePred);
+
+ List<LogicalOperator> succs = lp.getSuccessors(storePred);
+
+ for (LogicalOperator succ : succs) {
+ MultiMap<LogicalOperator, LogicalPlan> innerPls = null;
+
+ // fix inner plans for cogroup and frjoin operators
+ if (succ instanceof LOCogroup) {
+ innerPls = ((LOCogroup)succ).getGroupByPlans();
+ } else if (succ instanceof LOFRJoin) {
+ innerPls = ((LOFRJoin)succ).getJoinColPlans();
+ }
+
+ if (innerPls != null) {
+ if (innerPls.containsKey(load)) {
+ Collection<LogicalPlan> pls = innerPls.get(load);
+ innerPls.removeKey(load);
+ innerPls.put(storePred, pls);
+ }
+ }
+ }
+ } else {
+ try {
+ store.getPlan().connect(store, load);
+ } catch (PlanException ex) {
+ int errCode = 2128;
+ String msg = "Failed to connect store with dependent load.";
+ throw new FrontendException(msg, errCode, ex);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Thu Apr 23 16:57:16 2009
@@ -123,8 +123,11 @@
*
* @param plan PhysicalPlan to explain
* @param stream Stream to print output to
+ * @param format Format to print in
+ * @param verbose Amount of information to print
*/
- public void explain(PhysicalPlan plan, PrintStream stream);
+ public void explain(PhysicalPlan plan, PrintStream stream,
+ String format, boolean verbose);
/**
* Return currently running jobs (can be useful for admin purposes)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java Thu Apr 23 16:57:16 2009
@@ -47,7 +47,6 @@
String scope = leaf.getOperatorKey().getScope();
POStore str = new POStore(new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- str.setPc(pigContext);
spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
pigContext).toString(),
new FuncSpec(BinStorage.class.getName()));
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Thu Apr 23 16:57:16 2009
@@ -181,7 +181,7 @@
@Override
public String toString() {
- return path.toString();
+ return path.makeQualified(getHFS()).toString();
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Apr 23 16:57:16 2009
@@ -282,16 +282,13 @@
throw new UnsupportedOperationException();
}
- public void explain(PhysicalPlan plan, PrintStream stream) {
+ public void explain(PhysicalPlan plan, PrintStream stream, String format, boolean verbose) {
try {
- PlanPrinter printer = new PlanPrinter(plan, stream);
- printer.visit();
- stream.println();
-
ExecTools.checkLeafIsStore(plan, pigContext);
MapReduceLauncher launcher = new MapReduceLauncher();
- launcher.explain(plan, pigContext, stream);
+ launcher.explain(plan, pigContext, stream, format, verbose);
+
} catch (Exception ve) {
throw new RuntimeException(ve);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Apr 23 16:57:16 2009
@@ -22,12 +22,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
@@ -55,6 +51,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
@@ -106,85 +103,121 @@
public static final String LOG_DIR = "_logs";
+ private List<Path> tmpPath;
+ private Path curTmpPath;
+
+ public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
+ this.pigContext = pigContext;
+ this.conf = conf;
+ tmpPath = new LinkedList<Path>();
+ }
+
+ /**
+ * Moves all the results of a collection of MR jobs to the final
+ * output directory. Some of the results may have been put into a
+ * temp location to work around restrictions with multiple output
+ * from a single map reduce job.
+ *
+ * This method should always be called after the job execution
+ * completes.
+ */
+ public void moveResults() throws IOException {
+ if (curTmpPath != null) {
+ tmpPath.add(curTmpPath);
+ curTmpPath = null;
+ }
+
+ for (Path tmp: tmpPath) {
+ Path abs = new Path(tmp, "abs");
+ Path rel = new Path(tmp, "rel");
+ FileSystem fs = tmp.getFileSystem(conf);
+
+ if (fs.exists(abs)) {
+ moveResults(abs, abs.toUri().getPath(), fs);
+ }
+
+ if (fs.exists(rel)) {
+ moveResults(rel, rel.toUri().getPath()+"/", fs);
+ }
+ }
+ tmpPath = new LinkedList<Path>();
+ }
+
+ /**
+ * Walks the temporary directory structure to move (rename) files
+ * to their final location.
+ */
+ private void moveResults(Path p, String rem, FileSystem fs) throws IOException {
+ for (FileStatus fstat: fs.listStatus(p)) {
+ Path src = fstat.getPath();
+ if (fstat.isDir()) {
+ fs.mkdirs(removePart(src, rem));
+ moveResults(fstat.getPath(), rem, fs);
+ } else {
+ Path dst = removePart(src, rem);
+ fs.rename(src,dst);
+ }
+ }
+ }
+
+ private Path removePart(Path src, String part) {
+ URI uri = src.toUri();
+ String pathStr = uri.getPath().replace(part, "");
+ return new Path(pathStr);
+ }
+
+ private void makeTmpPath() throws IOException {
+ if (curTmpPath != null) {
+ tmpPath.add(curTmpPath);
+ }
+
+ for (int tries = 0;;) {
+ try {
+ curTmpPath =
+ new Path(FileLocalizer
+ .getTemporaryPath(null, pigContext).toString());
+ FileSystem fs = curTmpPath.getFileSystem(conf);
+ curTmpPath = curTmpPath.makeQualified(fs);
+ fs.mkdirs(curTmpPath);
+ break;
+ } catch (IOException ioe) {
+ if (++tries==100) {
+ throw ioe;
+ }
+ }
+ }
+ }
+
/**
* The map between MapReduceOpers and their corresponding Jobs
*/
Map<OperatorKey, Job> seen = new Hashtable<OperatorKey, Job>();
/**
- * Top level compile method that issues a call to the recursive
- * compile method.
+ * Compiles all jobs that have no dependencies removes them from
+ * the plan and returns. Should be called with the same plan until
+ * exhausted.
* @param plan - The MROperPlan to be compiled
* @param grpName - The name given to the JobControl
- * @param conf - The Configuration object having the various properties
- * @param pigContext - PigContext passed on from the execution engine
- * @return JobControl object
+ * @return JobControl object - null if no more jobs in plan
* @throws JobCreationException
*/
- public JobControl compile(MROperPlan plan, String grpName, Configuration conf, PigContext pigContext) throws JobCreationException{
+ public JobControl compile(MROperPlan plan, String grpName) throws JobCreationException{
this.plan = plan;
- this.conf = conf;
- this.pigContext = pigContext;
- JobControl jobCtrl = new JobControl(grpName);
-
- List<MapReduceOper> leaves ;
- leaves = plan.getLeaves();
-
- for (MapReduceOper mro : leaves) {
- jobCtrl.addJob(compile(mro,jobCtrl));
+
+ if (plan.size() == 0) {
+ return null;
}
- return jobCtrl;
- }
-
- /**
- * The recursive compilation method that works by doing a depth first
- * traversal of the MROperPlan. Compiles a Job for the input MapReduceOper
- * with the dependencies maintained in jobCtrl
- * @param mro - Input MapReduceOper for which a Job needs to be compiled
- * @param jobCtrl - The running JobCtrl object to maintain dependencies b/w jobs
- * @return Job corresponding to the input mro
- * @throws JobCreationException
- */
- private Job compile(MapReduceOper mro, JobControl jobCtrl) throws JobCreationException {
- List<MapReduceOper> pred = plan.getPredecessors(mro);
-
- JobConf currJC = null;
-
- try{
- if(pred==null || pred.size()<=0){
- //No dependencies! Create the JobConf
- //Construct the Job object with it and return
- Job ret = null;
- if(seen.containsKey(mro.getOperatorKey()))
- ret = seen.get(mro.getOperatorKey());
- else{
- currJC = getJobConf(mro, conf, pigContext);
- ret = new Job(currJC,null);
- seen.put(mro.getOperatorKey(), ret);
- }
- return ret;
- }
-
- //Has dependencies. So compile all the inputs
- List<Job> compiledInputs = new ArrayList<Job>(pred.size());
-
- for (MapReduceOper oper : pred) {
- Job ret = null;
- if(seen.containsKey(oper.getOperatorKey()))
- ret = seen.get(oper.getOperatorKey());
- else{
- ret = compile(oper, jobCtrl);
- jobCtrl.addJob(ret);
- seen.put(oper.getOperatorKey(),ret);
- }
- compiledInputs.add(ret);
+
+ JobControl jobCtrl = new JobControl(grpName);
+
+ try {
+ List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
+ roots.addAll(plan.getRoots());
+ for (MapReduceOper mro: roots) {
+ jobCtrl.addJob(new Job(getJobConf(mro, conf, pigContext)));
+ plan.remove(mro);
}
- //Get JobConf for the current MapReduceOper
- currJC = getJobConf(mro, conf, pigContext);
-
- //Create a new Job with the obtained JobConf
- //and the compiled inputs as dependent jobs
- return new Job(currJC,(ArrayList<Job>)compiledInputs);
} catch (JobCreationException jce) {
throw jce;
} catch(Exception e) {
@@ -192,8 +225,10 @@
String msg = "Internal error creating job configuration.";
throw new JobCreationException(msg, errCode, PigException.BUG, e);
}
+
+ return jobCtrl;
}
-
+
/**
* The method that creates the JobConf corresponding to a MapReduceOper.
* The assumption is that
@@ -225,32 +260,33 @@
//used as the working directory
String user = System.getProperty("user.name");
jobConf.setUser(user != null ? user : "Pigster");
-
- //Process the POLoads
- List<PhysicalOperator> lds = getRoots(mro.mapPlan);
- if(lds!=null && lds.size()>0){
- for (PhysicalOperator operator : lds) {
- POLoad ld = (POLoad)operator;
-
- Pair<FileSpec, Boolean> p = new Pair<FileSpec, Boolean>(ld.getLFile(), ld.isSplittable());
- //Store the inp filespecs
- inp.add(p);
-
- //Store the target operators for tuples read
- //from this input
- List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
- List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
- if(ldSucs!=null){
- for (PhysicalOperator operator2 : ldSucs) {
- ldSucKeys.add(operator2.getOperatorKey());
+
+ try{
+ //Process the POLoads
+ List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+
+ if(lds!=null && lds.size()>0){
+ for (POLoad ld : lds) {
+
+ Pair<FileSpec, Boolean> p = new Pair<FileSpec, Boolean>(ld.getLFile(), ld.isSplittable());
+ //Store the inp filespecs
+ inp.add(p);
+
+ //Store the target operators for tuples read
+ //from this input
+ List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
+ List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
+ if(ldSucs!=null){
+ for (PhysicalOperator operator2 : ldSucs) {
+ ldSucKeys.add(operator2.getOperatorKey());
+ }
}
+ inpTargets.add(ldSucKeys);
+ //Remove the POLoad from the plan
+ mro.mapPlan.remove(ld);
}
- inpTargets.add(ldSucKeys);
- //Remove the POLoad from the plan
- mro.mapPlan.remove(ld);
}
- }
- try{
+
//Create the jar of all functions reuired
File submitJarFile = File.createTempFile("Job", ".jar");
// ensure the job jar is deleted on exit
@@ -277,27 +313,49 @@
jobConf.setOutputFormat(PigOutputFormat.class);
//Process POStore and remove it from the plan
- POStore st = null;
- if(mro.reducePlan.isEmpty()){
- st = (POStore) mro.mapPlan.getLeaves().get(0);
- mro.mapPlan.remove(st);
- }
- else{
- st = (POStore) mro.reducePlan.getLeaves().get(0);
- mro.reducePlan.remove(st);
- }
- //set out filespecs
- String outputPath = st.getSFile().getFileName();
- FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
- FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
- jobConf.set("pig.storeFunc", outputFuncSpec.toString());
-
- // Setup the logs directory for streaming jobs
- jobConf.set("pig.streaming.log.dir",
- new Path(new Path(outputPath), LOG_DIR).toString());
- jobConf.set("pig.streaming.task.output.dir", outputPath);
+ List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
+ List<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan);
+
+ if (mapStores.size() + reduceStores.size() == 1) { // single store case
+ log.info("Setting up single store job");
+
+ POStore st;
+ if (reduceStores.isEmpty()) {
+ st = mapStores.remove(0);
+ mro.mapPlan.remove(st);
+ }
+ else {
+ st = reduceStores.remove(0);
+ mro.reducePlan.remove(st);
+ }
+ String outputPath = st.getSFile().getFileName();
+ FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
+ FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+ jobConf.set("pig.storeFunc", outputFuncSpec.toString());
+
+ jobConf.set("pig.streaming.log.dir",
+ new Path(outputPath, LOG_DIR).toString());
+ jobConf.set("pig.streaming.task.output.dir", outputPath);
+ }
+ else { // multi store case
+ log.info("Setting up multi store job");
+
+ makeTmpPath();
+ FileSystem fs = curTmpPath.getFileSystem(conf);
+ for (POStore st: mapStores) {
+ Path tmpOut = new Path(
+ curTmpPath,
+ PlanHelper.makeStoreTmpPath(st.getSFile().getFileName()));
+ fs.mkdirs(tmpOut);
+ }
+
+ FileOutputFormat.setOutputPath(jobConf, curTmpPath);
+
+ jobConf.set("pig.streaming.log.dir",
+ new Path(curTmpPath, LOG_DIR).toString());
+ jobConf.set("pig.streaming.task.output.dir", curTmpPath.toString());
+ }
-
// store map key type
// this is needed when the key is null to create
// an appropriate NullableXXXWritable object
@@ -305,13 +363,9 @@
// set parent plan in all operators in map and reduce plans
// currently the parent plan is really used only when POStream is present in the plan
- PhysicalPlan[] plans = new PhysicalPlan[] { mro.mapPlan, mro.reducePlan };
- for (int i = 0; i < plans.length; i++) {
- for (Iterator<PhysicalOperator> it = plans[i].iterator(); it.hasNext();) {
- PhysicalOperator op = it.next();
- op.setParentPlan(plans[i]);
- }
- }
+ new PhyPlanSetter(mro.mapPlan).visit();
+ new PhyPlanSetter(mro.reducePlan).visit();
+
POPackage pack = null;
if(mro.reducePlan.isEmpty()){
//MapOnly Job
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Thu Apr 23 16:57:16 2009
@@ -104,13 +104,17 @@
* @param pp PhysicalPlan to explain
* @param pc PigContext to use for configuration
* @param ps PrintStream to write output on.
+ * @param format Format to write in
+ * @param verbose Amount of information to print
* @throws VisitorException
* @throws IOException
*/
public abstract void explain(
PhysicalPlan pp,
PigContext pc,
- PrintStream ps) throws PlanException,
+ PrintStream ps,
+ String format,
+ boolean verbose) throws PlanException,
VisitorException,
IOException;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Apr 23 16:57:16 2009
@@ -77,6 +77,7 @@
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
/**
* The compiler that compiles a given physical plan
@@ -129,6 +130,9 @@
//The output of compiling the inputs
MapReduceOper[] compiledInputs = null;
+
+ //Mapping of which MapReduceOper a store belongs to.
+ Map<POStore, MapReduceOper> storeToMapReduceMap;
//The split operators seen till now. If not
//maintained they will haunt you.
@@ -175,6 +179,7 @@
}
scope = roots.get(0).getOperatorKey().getScope();
messageCollector = new CompilationMessageCollector() ;
+ storeToMapReduceMap = new HashMap<POStore, MapReduceOper>();
}
public void randomizeFileLocalizer(){
@@ -212,15 +217,21 @@
*/
public MROperPlan compile() throws IOException, PlanException, VisitorException {
List<PhysicalOperator> leaves = plan.getLeaves();
- if(!(leaves.get(0) instanceof POStore)) {
- int errCode = 2025;
- String msg = "Expected leaf of reduce plan to " +
- "always be POStore. Found " + leaves.get(0).getClass().getSimpleName();
- throw new MRCompilerException(msg, errCode, PigException.BUG);
- }
- POStore store = (POStore)leaves.get(0);
- FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
- compile(store);
+
+ for (PhysicalOperator op : leaves) {
+ if (!(op instanceof POStore)) {
+ int errCode = 2025;
+ String msg = "Expected leaf of reduce plan to " +
+ "always be POStore. Found " + op.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ List<POStore> stores = PlanHelper.getStores(plan);
+ for (POStore store: stores) {
+ FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
+ compile(store);
+ }
// I'm quite certain this is not the best way to do this. The issue
// is that for jobs that take multiple map reduce passes, for
@@ -270,6 +281,41 @@
//op.
List<PhysicalOperator> predecessors = plan.getPredecessors(op);
if (predecessors != null && predecessors.size() > 0) {
+ // When processing an entire script (multiquery), we can
+ // get into a situation where a load has
+ // predecessors. This means that it depends on some store
+ // earlier in the plan. We need to take that dependency
+ // and connect the respective MR operators, while at the
+ // same time removing the connection between the Physical
+ // operators. That way the jobs will run in the right
+ // order.
+ if (op instanceof POLoad) {
+
+ if (predecessors.size() != 1) {
+ int errCode = 2125;
+ String msg = "Expected at most one predecessor of load. Got "+predecessors.size();
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ PhysicalOperator p = predecessors.get(0);
+ if (!(p instanceof POStore)) {
+ int errCode = 2126;
+ String msg = "Predecessor of load should be a store. Got "+p.getClass();
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ // Need new operator
+ curMROp = getMROp();
+ curMROp.mapPlan.add(op);
+ MRPlan.add(curMROp);
+
+ MapReduceOper oper = storeToMapReduceMap.get((POStore)p);
+
+ plan.disconnect(op, p);
+ MRPlan.connect(oper, curMROp);
+ return;
+ }
+
Collections.sort(predecessors);
compiledInputs = new MapReduceOper[predecessors.size()];
int i = -1;
@@ -311,7 +357,9 @@
private POStore getStore(){
POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
- st.setPc(pigContext);
+ // mark store as tmp store. These could be removed by the
+ // optimizer, because it wasn't the user requesting it.
+ st.setIsTmpStore(true);
return st;
}
@@ -501,7 +549,7 @@
MRPlan.connect(old, ret);
return ret;
}
-
+
/**
* Returns a temporary DFS Path
* @return
@@ -586,14 +634,6 @@
}
}
- /*private void addUDFs(PhysicalPlan plan) throws VisitorException{
- if(plan!=null){
- udfFinderForExpr.setPlan(plan);
- udfFinderForExpr.visit();
- curMROp.UDFs.addAll(udfFinderForExpr.getUDFs());
- }
- }*/
-
private void addUDFs(PhysicalPlan plan) throws VisitorException{
if(plan!=null){
udfFinder.setPlan(plan);
@@ -620,6 +660,7 @@
try{
FileSpec fSpec = op.getSplitStore();
MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
+ mro.setSplitter(true);
splitsSeen.put(op.getOperatorKey(), mro);
curMROp = startNew(fSpec, mro);
}catch(Exception e){
@@ -641,6 +682,7 @@
public void visitStore(POStore op) throws VisitorException{
try{
+ storeToMapReduceMap.put(op, curMROp);
nonBlocking(op);
}catch(Exception e){
int errCode = 2034;
@@ -1740,5 +1782,5 @@
keyType = p.getResultType();
}
}
-
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Apr 23 16:57:16 2009
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -44,6 +45,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -75,7 +77,7 @@
ExecException,
JobCreationException,
Exception {
- long sleepTime = 5000;
+ long sleepTime = 500;
aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
MROperPlan mrp = compile(php, pc);
@@ -84,70 +86,79 @@
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
- JobControlCompiler jcc = new JobControlCompiler();
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
- JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-
- int numMRJobs = jc.getWaitingJobs().size();
+ List<Job> failedJobs = new LinkedList<Job>();
+ List<Job> succJobs = new LinkedList<Job>();
+ JobControl jc;
+ int totalMRJobs = mrp.size();
+ int numMRJobsCompl = 0;
+ int numMRJobsCurrent = 0;
+ double lastProg = -1;
//create the exception handler for the job control thread
//and register the handler with the job control thread
JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
- Thread jcThread = new Thread(jc);
- jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
- jcThread.start();
- double lastProg = -1;
- int perCom = 0;
- while(!jc.allFinished()){
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {}
- double prog = calculateProgress(jc, jobClient)/numMRJobs;
- if(prog>=(lastProg+0.01)){
- perCom = (int)(prog * 100);
- if(perCom!=100)
- log.info( perCom + "% complete");
+ while((jc = jcc.compile(mrp, grpName)) != null) {
+ numMRJobsCurrent = jc.getWaitingJobs().size();
+
+ Thread jcThread = new Thread(jc);
+ jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+ jcThread.start();
+
+ while(!jc.allFinished()){
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {}
+ double prog = (numMRJobsCompl+calculateProgress(jc, jobClient))/totalMRJobs;
+ if(prog>=(lastProg+0.01)){
+ int perCom = (int)(prog * 100);
+ if(perCom!=100)
+ log.info( perCom + "% complete");
+ }
+ lastProg = prog;
}
- lastProg = prog;
- }
-
- //check for the jobControlException first
- //if the job controller fails before launching the jobs then there are
- //no jobs to check for failure
- if(jobControlException != null) {
+
+ //check for the jobControlException first
+ //if the job controller fails before launching the jobs then there are
+ //no jobs to check for failure
+ if(jobControlException != null) {
if(jobControlException instanceof PigException) {
- throw jobControlException;
+ throw jobControlException;
} else {
- int errCode = 2117;
- String msg = "Unexpected error when launching map reduce job.";
- throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
+ int errCode = 2117;
+ String msg = "Unexpected error when launching map reduce job.";
+ throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
}
+ }
+
+ numMRJobsCompl += numMRJobsCurrent;
+ failedJobs.addAll(jc.getFailedJobs());
+ succJobs.addAll(jc.getSuccessfulJobs());
+ jcc.moveResults();
+ jc.stop();
}
-
+
// Look to see if any jobs failed. If so, we need to report that.
- List<Job> failedJobs = jc.getFailedJobs();
if (failedJobs != null && failedJobs.size() > 0) {
log.error("Map reduce job failed");
for (Job fj : failedJobs) {
getStats(fj, jobClient, true, pc);
}
- jc.stop();
return false;
}
Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
- List<Job> succJobs = jc.getSuccessfulJobs();
- if(succJobs!=null)
+ if(succJobs!=null) {
for(Job job : succJobs){
getStats(job,jobClient, false, pc);
if(aggregateWarning) {
computeWarningAggregate(job, jobClient, warningAggMap);
}
}
-
- jc.stop();
+ }
if(aggregateWarning) {
CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
@@ -162,13 +173,27 @@
public void explain(
PhysicalPlan php,
PigContext pc,
- PrintStream ps) throws PlanException, VisitorException,
+ PrintStream ps,
+ String format,
+ boolean verbose) throws PlanException, VisitorException,
IOException {
log.trace("Entering MapReduceLauncher.explain");
MROperPlan mrp = compile(php, pc);
- MRPrinter printer = new MRPrinter(ps, mrp);
- printer.visit();
+ if (format.equals("text")) {
+ MRPrinter printer = new MRPrinter(ps, mrp);
+ printer.setVerbose(verbose);
+ printer.visit();
+ } else {
+ ps.println("#--------------------------------------------------");
+ ps.println("# Map Reduce Plan ");
+ ps.println("#--------------------------------------------------");
+
+ DotMRPrinter printer =new DotMRPrinter(mrp, ps);
+ printer.setVerbose(verbose);
+ printer.dump();
+ ps.println("");
+ }
}
private MROperPlan compile(
@@ -197,10 +222,6 @@
POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
pkgAnnotator.visit();
- // check whether stream operator is present
- MRStreamHandler checker = new MRStreamHandler(plan);
- checker.visit();
-
// optimize joins
LastInputStreamingOptimizer liso =
new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
@@ -211,6 +232,27 @@
// an appropriate NullableXXXWritable object
KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
kdv.visit();
+
+ // removes the filter(constant(true)) operators introduced by
+ // splits.
+ NoopFilterRemover fRem = new NoopFilterRemover(plan);
+ fRem.visit();
+
+ MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
+ mqOptimizer.visit();
+
+ // removes unnecessary stores (as can happen with splits in
+ // some cases.). This has to run after the MultiQuery and
+ // NoopFilterRemover.
+ NoopStoreRemover sRem = new NoopStoreRemover(plan);
+ sRem.visit();
+
+ // check whether stream operator is present
+ // after MultiQueryOptimizer because it can shift streams from
+ // map to reduce, etc.
+ MRStreamHandler checker = new MRStreamHandler(plan);
+ checker.visit();
+
return plan;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Apr 23 16:57:16 2009
@@ -109,6 +109,10 @@
// to add additional map reduce operator with 1 reducer after this
long limit = -1;
+ // Indicates that this MROper is a splitter MROper.
+ // That is, this MROper ends due to a POSPlit operator.
+ private boolean splitter = false;
+
public MapReduceOper(OperatorKey k) {
super(k);
mapPlan = new PhysicalPlan();
@@ -323,4 +327,16 @@
public void setReplFiles(FileSpec[] replFiles) {
this.replFiles = replFiles;
}
+
+ public int getRequestedParallelism() {
+ return requestedParallelism;
+ }
+
+ public void setSplitter(boolean spl) {
+ splitter = spl;
+ }
+
+ public boolean isSplitter() {
+ return splitter;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Apr 23 16:57:16 2009
@@ -57,7 +57,7 @@
import org.apache.pig.impl.util.Pair;
public class PigInputFormat implements InputFormat<Text, Tuple>,
- JobConfigurable {
+ JobConfigurable {
public static final Log LOG = LogFactory
.getLog(PigInputFormat.class);
@@ -200,11 +200,25 @@
for (int i = 0; i < inputs.size(); i++) {
try {
Path path = new Path(inputs.get(i).first.getFileName());
- FileSystem fs = path.getFileSystem(job);
+
+ FileSystem fs;
+
+ try {
+ fs = path.getFileSystem(job);
+ } catch (Exception e) {
+ // If an application specific
+ // scheme was used
+ // (e.g.: "hbase://table") we will fail
+ // getting the file system. That's
+ // ok, we just use the dfs in that case.
+ fs = new Path("/").getFileSystem(job);
+ }
+
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
- if(pigContext.getExecType() == ExecType.MAPREDUCE)
+ if(pigContext.getExecType() == ExecType.MAPREDUCE) {
fs.setWorkingDirectory(new Path("/user", job.getUser()));
+ }
DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
ValidatingInputFileSpec spec;