You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/04/14 19:57:57 UTC
svn commit: r764904 - in /hadoop/pig/branches/multiquery: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/...
Author: pradeepkth
Date: Tue Apr 14 17:57:56 2009
New Revision: 764904
URL: http://svn.apache.org/viewvc?rev=764904&view=rev
Log:
PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
Modified:
hadoop/pig/branches/multiquery/CHANGES.txt
hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java
Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Tue Apr 14 17:57:56 2009
@@ -600,3 +600,5 @@
PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
+
+ PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Tue Apr 14 17:57:56 2009
@@ -884,7 +884,8 @@
this.batchMode = batchMode;
this.processedStores = 0;
this.ignoreNumStores = 0;
- this.jobName = "DefaultJobName";
+ this.jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME,
+ PigContext.JOB_NAME_PREFIX+":DefaultJobName");
this.lp = new LogicalPlan();
};
@@ -901,7 +902,7 @@
boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
void execute() throws ExecException, FrontendException {
- pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + jobName);
+ pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
PigServer.this.execute(null);
processedStores = storeOpTable.keySet().size();
}
@@ -911,7 +912,7 @@
}
void setJobName(String name) {
- jobName = name;
+ jobName = PigContext.JOB_NAME_PREFIX+":"+name;
}
LogicalPlan getPlan(String alias) throws IOException {
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Apr 14 17:57:56 2009
@@ -222,10 +222,6 @@
POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
pkgAnnotator.visit();
- // check whether stream operator is present
- MRStreamHandler checker = new MRStreamHandler(plan);
- checker.visit();
-
// optimize joins
LastInputStreamingOptimizer liso =
new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
@@ -250,6 +246,12 @@
// NoopFilterRemover.
NoopStoreRemover sRem = new NoopStoreRemover(plan);
sRem.visit();
+
+ // check whether stream operator is present
+ // after MultiQueryOptimizer because it can shift streams from
+ // map to reduce, etc.
+ MRStreamHandler checker = new MRStreamHandler(plan);
+ checker.visit();
return plan;
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java Tue Apr 14 17:57:56 2009
@@ -71,6 +71,7 @@
for (Pair<POFilter, PhysicalPlan> pair: removalQ) {
removeFilter(pair.first, pair.second);
}
+ removalQ.clear();
}
@Override
@@ -94,7 +95,15 @@
private void removeFilter(POFilter filter, PhysicalPlan plan) {
if (plan.size() > 1) {
try {
+ List<PhysicalOperator> fInputs = filter.getInputs();
+ List<PhysicalOperator> sucs = plan.getSuccessors(filter);
+
plan.removeAndReconnect(filter);
+ if(sucs!=null && sucs.size()!=0){
+ for (PhysicalOperator suc : sucs) {
+ suc.setInputs(fInputs);
+ }
+ }
} catch (PlanException pe) {
log.info("Couldn't remove a filter in optimizer: "+pe.getMessage());
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Tue Apr 14 17:57:56 2009
@@ -65,6 +65,7 @@
for (PhysicalPlan plan : inpPlans) {
pushWalker(mCurrentWalker.spawnChildWalker(plan));
visit();
+ popWalker();
}
}
@@ -115,6 +116,7 @@
for (PhysicalPlan plan : inpPlans) {
pushWalker(mCurrentWalker.spawnChildWalker(plan));
visit();
+ popWalker();
}
}
@@ -258,6 +260,7 @@
for (PhysicalPlan plan : inpPlans) {
pushWalker(mCurrentWalker.spawnChildWalker(plan));
visit();
+ popWalker();
}
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Tue Apr 14 17:57:56 2009
@@ -95,6 +95,8 @@
private static Result empty = new Result(POStatus.STATUS_NULL, null);
+ private boolean inpEOP = false;
+
/**
* Constructs an operator with the specified key
* @param k the operator key
@@ -200,6 +202,12 @@
@Override
public Result getNext(Tuple t) throws ExecException {
+
+ if (this.parentPlan.endOfAllInput) {
+
+ return getStreamCloseResult();
+
+ }
if (processedSet.cardinality() == myPlans.size()) {
@@ -258,5 +266,53 @@
return res;
}
+
+ private Result getStreamCloseResult() throws ExecException {
+ Result res = null;
+
+ while (true) {
+
+ if (processedSet.cardinality() == myPlans.size()) {
+ Result inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_OK) {
+ Tuple tuple = (Tuple)inp.result;
+ for (PhysicalPlan pl : myPlans) {
+ pl.attachInput(tuple);
+ }
+ inpEOP = false;
+ } else if (inp.returnStatus == POStatus.STATUS_EOP){
+ inpEOP = true;
+ } else if (inp.returnStatus == POStatus.STATUS_NULL) {
+ inpEOP = false;
+ } else if (inp.returnStatus == POStatus.STATUS_ERR) {
+ return inp;
+ }
+ processedSet.clear();
+ }
+
+ int idx = processedSet.nextClearBit(0);
+ PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
+
+ res = leaf.getNext(dummyTuple);
+
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ processedSet.set(idx++);
+ if (idx < myPlans.size()) {
+ continue;
+ }
+ } else {
+ break;
+ }
+
+ if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) {
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ return res;
+
+ }
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Apr 14 17:57:56 2009
@@ -65,6 +65,7 @@
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.hadoop.fs.Path;
public class QueryParser {
private PigContext pigContext;
@@ -213,6 +214,13 @@
path = uri.getSchemeSpecificPart();
}
+ if ((scheme == null && pigContext.getExecType() == ExecType.MAPREDUCE) ||
+ "hdfs".equalsIgnoreCase(scheme)) {
+ // We need to get the path from a hadoop path object,
+ // otherwise special glob characters could get removed.
+ path = new Path(fname).toUri().getPath();
+ }
+
if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) {
if (pigContext.getExecType() != ExecType.LOCAL) {
if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java Tue Apr 14 17:57:56 2009
@@ -322,6 +322,7 @@
if (batch) {
setBatchOn();
+ mPigServer.setJobName(script);
try {
loadScript(script, true, mLoadOnly, params, files);
executeBatch();
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java Tue Apr 14 17:57:56 2009
@@ -132,7 +132,7 @@
@Test
public void testLoadRemoteRelScheme() throws Exception {
- checkLoadPath("hdfs:test","/tmp/test");
+ checkLoadPath("test","/tmp/test");
}
@Test
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java?rev=764904&r1=764903&r2=764904&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java Tue Apr 14 17:57:56 2009
@@ -217,7 +217,7 @@
@Test
public void testStoreRemoteRelScheme() throws Exception {
- checkStorePath("hdfs:test","/tmp/test");
+ checkStorePath("test","/tmp/test");
}
@Test