You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ha...@apache.org on 2010/02/26 20:33:27 UTC
svn commit: r916793 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/impl/logicalLayer/ src/org/apache/p...
Author: hashutosh
Date: Fri Feb 26 19:33:27 2010
New Revision: 916793
URL: http://svn.apache.org/viewvc?rev=916793&view=rev
Log:
PIG-1251: Move SortInfo calculation earlier in compilation
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb 26 19:33:27 2010
@@ -58,6 +58,8 @@
IMPROVEMENTS
+PIG-1251: Move SortInfo calculation earlier in compilation (ashutoshc)
+
PIG-1233: NullPointerException in AVG (ankur via olgan)
PIG-1218: Use distributed cache to store samples (rding via pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri Feb 26 19:33:27 2010
@@ -54,7 +54,13 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LOLimit;
import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -66,12 +72,15 @@
import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
-import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.logicalLayer.LODefine;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.pen.ExampleGenerator;
@@ -837,6 +846,9 @@
PlanSetter ps = new PlanSetter(lpClone);
ps.visit();
+ SortInfoSetter sortInfoSetter = new SortInfoSetter(lpClone);
+ sortInfoSetter.visit();
+
// run through validator
CompilationMessageCollector collector = new CompilationMessageCollector() ;
FrontendException caught = null;
@@ -909,6 +921,56 @@
}
return lp;
}
+
+ public static class SortInfoSetter extends LOVisitor{
+
+ public SortInfoSetter(LogicalPlan plan) {
+ super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+ }
+
+ @Override
+ protected void visit(LOStore store) throws VisitorException {
+
+ LogicalOperator storePred = store.getPlan().getPredecessors(store).get(0);
+ if(storePred == null){
+ int errCode = 2051;
+ String msg = "Did not find a predecessor for Store." ;
+ throw new VisitorException(msg, errCode, PigException.BUG);
+ }
+
+ SortInfo sortInfo = null;
+ if(storePred instanceof LOLimit) {
+ storePred = store.getPlan().getPredecessors(storePred).get(0);
+ } else if (storePred instanceof LOSplitOutput) {
+ LOSplitOutput splitOutput = (LOSplitOutput)storePred;
+ // We assume this is the LOSplitOutput we injected for this case:
+ // b = order a by $0; store b into '1'; store b into '2';
+ // In this case, we should mark both '1' and '2' as sorted
+ LogicalPlan conditionPlan = splitOutput.getConditionPlan();
+ if (conditionPlan.getRoots().size()==1) {
+ LogicalOperator root = conditionPlan.getRoots().get(0);
+ if (root instanceof LOConst) {
+ Object value = ((LOConst)root).getValue();
+ if (value instanceof Boolean && (Boolean)value==true) {
+ LogicalOperator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
+ if (split instanceof LOSplit)
+ storePred = store.getPlan().getPredecessors(split).get(0);
+ }
+ }
+ }
+ }
+ // if this predecessor is a sort, get
+ // the sort info.
+ if(storePred instanceof LOSort) {
+ try {
+ sortInfo = ((LOSort)storePred).getSortInfo();
+ } catch (FrontendException e) {
+ throw new VisitorException(e);
+ }
+ }
+ store.setSortInfo(sortInfo);
+ }
+ }
/*
* This class holds the internal states of a grunt shell session.
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Feb 26 19:33:27 2010
@@ -431,16 +431,12 @@
for (POStore st: mapStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
- if (st.getSchema()!=null)
- sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo()));
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
}
for (POStore st: reduceStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
- if (st.getSchema()!=null)
- sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo()));
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Feb 26 19:33:27 2010
@@ -1605,6 +1605,7 @@
store.setSFile(loStore.getOutputFile());
store.setInputSpec(loStore.getInputSpec());
store.setSignature(loStore.getSignature());
+ store.setSortInfo(loStore.getSortInfo());
try {
// create a new schema for ourselves so that when
// we serialize we are not serializing objects that
@@ -1621,47 +1622,8 @@
}
currentPlan.add(store);
- List<LogicalOperator> op = loStore.getPlan().getPredecessors(loStore);
- PhysicalOperator from;
+ PhysicalOperator from = logToPhyMap.get(loStore.getPlan().getPredecessors(loStore).get(0));
- if(op != null) {
- from = logToPhyMap.get(op.get(0));
- SortInfo sortInfo = null;
- // if store's predecessor is limit,
- // check limit's predecessor
- if(op.get(0) instanceof LOLimit) {
- op = loStore.getPlan().getPredecessors(op.get(0));
- } else if (op.get(0) instanceof LOSplitOutput) {
- LOSplitOutput splitOutput = (LOSplitOutput)op.get(0);
- // We assume this is the LOSplitOutput we injected for this case:
- // b = order a by $0; store b into '1'; store b into '2';
- // In this case, we should mark both '1' and '2' as sorted
- LogicalPlan conditionPlan = splitOutput.getConditionPlan();
- if (conditionPlan.getRoots().size()==1) {
- LogicalOperator root = conditionPlan.getRoots().get(0);
- if (root instanceof LOConst) {
- Object value = ((LOConst)root).getValue();
- if (value instanceof Boolean && (Boolean)value==true) {
- LogicalOperator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
- if (split instanceof LOSplit)
- op = loStore.getPlan().getPredecessors(split);
- }
- }
- }
- }
- PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
- // if this predecessor is a sort, get
- // the sort info.
- if(op.get(0) instanceof LOSort) {
- sortInfo = ((POSort)sortPhyOp).getSortInfo();
- }
- store.setSortInfo(sortInfo);
- } else {
- int errCode = 2051;
- String msg = "Did not find a predecessor for Store." ;
- throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
- }
-
try {
currentPlan.connect(from, store);
} catch (PlanException e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Fri Feb 26 19:33:27 2010
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.pig.FuncSpec;
+import org.apache.pig.SortInfo;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -47,6 +48,16 @@
transient private StoreFuncInterface mStoreFunc;
private static Log log = LogFactory.getLog(LOStore.class);
+
+ private SortInfo sortInfo;
+
+ public SortInfo getSortInfo() {
+ return sortInfo;
+ }
+
+ public void setSortInfo(SortInfo sortInfo) {
+ this.sortInfo = sortInfo;
+ }
/**
* @param plan
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java Fri Feb 26 19:33:27 2010
@@ -21,8 +21,10 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext ;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LOVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
@@ -65,10 +67,23 @@
StoreFuncInterface sf = store.getStoreFunc();
String outLoc = store.getOutputFile().getFileName();
- Job dummyJob;
String errMsg = "Unexpected error. Could not validate the output " +
- "specification for: "+outLoc;
+ "specification for: "+outLoc;
int errCode = 2116;
+
+ try {
+ if(store.getSchema() != null){
+ sf.checkSchema(new ResourceSchema(store.getSchema(), store.getSortInfo()));
+ }
+ } catch (FrontendException e) {
+ msgCollector.collect(errMsg, MessageType.Error) ;
+ throw new PlanValidationException(errMsg, errCode, pigCtx.getErrorSource(), e);
+ } catch (IOException e) {
+ msgCollector.collect(errMsg, MessageType.Error) ;
+ throw new PlanValidationException(errMsg, errCode, pigCtx.getErrorSource(), e);
+ }
+
+ Job dummyJob;
try {
dummyJob = new Job(ConfigurationUtil.toConfiguration(pigCtx.getProperties()));
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java Fri Feb 26 19:33:27 2010
@@ -62,9 +62,7 @@
if (!pigContext.inExplain) {
// When running explain we don't want to check for input
// files.
- // Temporarily disabling InputOutputFileValidator on trunk
- // till PIG-1251 and PIG-1216 are addressed.
- //validatorList.add(new InputOutputFileValidator(pigContext)) ;
+ validatorList.add(new InputOutputFileValidator(pigContext)) ;
}
// This one has to be done before the type checker.
//validatorList.add(new TypeCastInserterValidator()) ;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java Fri Feb 26 19:33:27 2010
@@ -65,8 +65,6 @@
}
-// Comment out until PIG-1251 solved
-/*
@Test
public void testLocalModeNegative2() throws Throwable {
@@ -93,7 +91,6 @@
}
}
-*/
@Test
public void testMapReduceModeInputPositive() throws Throwable {
@@ -114,8 +111,6 @@
}
-// Comment out until PIG-1251 solved
-/*
@Test
public void testMapReduceModeInputNegative2() throws Throwable {
@@ -142,7 +137,6 @@
}
}
-*/
private LogicalPlan genNewLoadStorePlan(String inputFile,
String outputFile, DataStorage dfs)
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java Fri Feb 26 19:33:27 2010
@@ -613,6 +613,8 @@
lpt.buildPlan("a = load 'bla' as (i:int, n:chararray, d:double);");
lpt.buildPlan("b = order a by i, d;");
LogicalPlan lp = lpt.buildPlan("store b into 'foo';");
+ PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp);
+ siSetter.visit();
PhysicalPlan pp = buildPhysicalPlan(lp);
SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
SortInfo expected = getSortInfo(
@@ -635,6 +637,8 @@
lpt.buildPlan("b = filter a by i > 10;");
lpt.buildPlan("c = order b by i desc, d;");
LogicalPlan lp = lpt.buildPlan("store c into 'foo';");
+ PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp);
+ siSetter.visit();
PhysicalPlan pp = buildPhysicalPlan(lp);
SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
SortInfo expected = getSortInfo(
@@ -657,6 +661,8 @@
lpt.buildPlan("a = load 'bla' as (i:int, n:chararray, d:double);");
lpt.buildPlan("b = filter a by i > 10;");
LogicalPlan lp = lpt.buildPlan("store b into 'foo';");
+ PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp);
+ siSetter.visit();
PhysicalPlan pp = buildPhysicalPlan(lp);
SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
assertEquals(null, si);
@@ -675,6 +681,8 @@
lpt.buildPlan("c = filter b by i > 10;");
LogicalPlan lp = lpt.buildPlan("store c into 'foo';");
PhysicalPlan pp = buildPhysicalPlan(lp);
+ PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp);
+ siSetter.visit();
SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
assertEquals(null, si);
}
@@ -691,6 +699,8 @@
lpt.buildPlan("b = order a by i, d desc;");
lpt.buildPlan("c = limit b 10;");
LogicalPlan lp = lpt.buildPlan("store c into 'foo';");
+ PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp);
+ siSetter.visit();
LOPrinter lpr = new LOPrinter(System.err, lp);
lpr.visit();
PhysicalPlan pp = buildPhysicalPlan(lp);
@@ -750,6 +760,8 @@
lpt.buildPlan("a = load 'bla' ;");
lpt.buildPlan("b = order a by $0;");
LogicalPlan lp = lpt.buildPlan("store b into 'foo';");
+ PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp);
+ siSetter.visit();
PhysicalPlan pp = buildPhysicalPlan(lp);
SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
SortInfo expected = getSortInfo(Arrays.asList(new String[] {null}),