You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/17 19:22:45 UTC
svn commit: r911135 - in /hadoop/pig/branches/load-store-redesign: ./
src/org/apache/pig/builtin/ src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/pig/test/
test/org/apache/pig/test/utils/
Author: pradeepkth
Date: Wed Feb 17 18:22:45 2010
New Revision: 911135
URL: http://svn.apache.org/viewvc?rev=911135&view=rev
Log:
PIG-1216: New load store design does not allow Pig to validate inputs and outputs up front (ashutoshc via pradeepkth)
Modified:
hadoop/pig/branches/load-store-redesign/CHANGES.txt
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java
Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Wed Feb 17 18:22:45 2010
@@ -121,6 +121,9 @@
BUG FIXES
+PIG-1216: New load store design does not allow Pig to validate inputs and
+outputs up front (ashutoshc via pradeepkth)
+
PIG-1239: PigContext.connect() should not create a jobClient and jobClient
should be created on demand when needed (pradeepkth)
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Wed Feb 17 18:22:45 2010
@@ -42,7 +42,6 @@
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
-import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema;
@@ -63,7 +62,6 @@
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.util.LogUtils;
public class BinStorage extends FileInputLoadFunc
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java Wed Feb 17 18:22:45 2010
@@ -21,9 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
@@ -50,9 +48,7 @@
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.StorageUtil;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java Wed Feb 17 18:22:45 2010
@@ -19,10 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.pig.FuncSpec;
import org.apache.pig.StoreFunc;
@@ -33,8 +30,6 @@
import org.apache.pig.impl.plan.RequiredFields;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.util.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java Wed Feb 17 18:22:45 2010
@@ -19,22 +19,19 @@
import java.io.IOException;
-import org.apache.pig.ExecType;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigException;
+import org.apache.pig.StoreFunc;
import org.apache.pig.impl.PigContext ;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LOVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.PlanWalker;
-import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.impl.plan.PlanValidationException;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
/***
* Visitor for checking input/output files
@@ -56,85 +53,52 @@
super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
pigCtx = pigContext ;
msgCollector = messageCollector ;
+
}
/***
- * The logic here is just to check that the file(s) do not exist
+ * The logic here is to delegate the validation of output specification
+ * to output format implementation.
*/
@Override
protected void visit(LOStore store) throws PlanValidationException{
- // make sure that the file doesn't exist
- String filename = store.getOutputFile().getFileName() ;
+
+ StoreFunc sf = store.getStoreFunc();
+ String outLoc = store.getOutputFile().getFileName();
+ Job dummyJob;
+ String errMsg = "Unexpected error. Could not validate the output " +
+ "specification for: "+outLoc;
+ int errCode = 2116;
try {
- if (checkFileExists(filename)) {
- byte errSrc = pigCtx.getErrorSource();
- int errCode = 0;
- switch(errSrc) {
- case PigException.BUG:
- errCode = 2002;
- break;
- case PigException.REMOTE_ENVIRONMENT:
- errCode = 6000;
- break;
- case PigException.USER_ENVIRONMENT:
- errCode = 4000;
- break;
- }
- String msg = "The output file(s): " + filename
- + " already exists";
- msgCollector.collect(msg, MessageType.Error) ;
- throw new PlanValidationException(msg, errCode, errSrc);
- }
- } catch (PlanValidationException pve) {
- throw pve;
+ dummyJob = new Job(ConfigurationUtil.toConfiguration(pigCtx.getProperties()));
+ sf.setStoreLocation(outLoc, dummyJob);
+ } catch (IOException ioe) {
+ msgCollector.collect(errMsg, MessageType.Error) ;
+ throw new PlanValidationException(errMsg, errCode, pigCtx.getErrorSource(), ioe);
+ }
+ try {
+ sf.getOutputFormat().checkOutputSpecs(dummyJob);
} catch (IOException ioe) {
byte errSrc = pigCtx.getErrorSource();
- int errCode = 0;
+ errCode = 0;
switch(errSrc) {
case PigException.BUG:
- errCode = 2003;
+ errCode = 2002;
break;
case PigException.REMOTE_ENVIRONMENT:
- errCode = 6001;
+ errCode = 6000;
break;
case PigException.USER_ENVIRONMENT:
- errCode = 4001;
+ errCode = 4000;
break;
}
-
- String msg = "Cannot read from the storage where the output "
- + filename + " will be stored ";
- msgCollector.collect(msg, MessageType.Error) ;
- throw new PlanValidationException(msg, errCode, errSrc, ioe);
- } catch (Exception e) {
- int errCode = 2116;
- String msg = "Unexpected error. Could not check for the existence of the file(s): " + filename;
- msgCollector.collect(msg, MessageType.Error) ;
- throw new PlanValidationException(msg, errCode, PigException.BUG, e);
+ errMsg = "Output specification is invalid: "+outLoc;
+ msgCollector.collect(errMsg, MessageType.Error) ;
+ throw new PlanValidationException(errMsg, errCode, errSrc, ioe);
+ } catch (InterruptedException ie) {
+ msgCollector.collect(errMsg, MessageType.Error) ;
+ throw new PlanValidationException(errMsg, errCode, pigCtx.getErrorSource(), ie);
}
}
-
- /***
- * Check if the file(s) exist. There are two cases :-
- * 1) Exact match
- * 2) Globbing match
- * TODO: Add globbing support in local execution engine
- * and then make this check for local FS support too
- */
- private boolean checkFileExists(String filename) throws IOException {
- if (pigCtx.getExecType() == ExecType.LOCAL) {
- ElementDescriptor elem = pigCtx.getLfs().asElement(filename) ;
- return elem.exists() ;
- }
- else if (pigCtx.getExecType() == ExecType.MAPREDUCE) {
- // TODO: Have to put the staging from local to HDFS somewhere else
- // This does actual file check + glob check
- return FileLocalizer.fileExists(filename, pigCtx) ;
- }
- else { // if ExecType is something else)
- throw new RuntimeException("Undefined state in " + this.getClass()) ;
- }
- }
-
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java Wed Feb 17 18:22:45 2010
@@ -78,10 +78,10 @@
LoadFunc.getAbsolutePath(absPath, curHdfsDir));
}
- @Test(expected=FrontendException.class)
+ @Test
public void testGetAbsolutePath4() throws IOException {
- // test case: incompatible schemes
- Assert.assertEquals("hdfs://myhost:123455/data/passwd",
+ // test case: non dfs scheme
+ Assert.assertEquals("http://myhost:12345/data/passwd",
LoadFunc.getAbsolutePath("http://myhost:12345/data/passwd",
curHdfsDir));
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Wed Feb 17 18:22:45 2010
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
@@ -63,9 +64,13 @@
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.validators.InputOutputFileVisitor;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanValidationException;
import org.apache.pig.pen.physicalOperators.POCounter;
import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.LogicalPlanTester;
import org.apache.pig.test.utils.TestHelper;
import org.apache.pig.tools.pigstats.PigStats;
import org.junit.After;
@@ -125,6 +130,56 @@
}
@Test
+ public void testValidation() throws Exception{
+
+ String outputFileName = "test-output.txt";
+ try {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load '" + inputFileName + "' as (c:chararray, " +
+ "i:int,d:double);");
+ LogicalPlan lp = lpt.buildPlan("store a into '" + outputFileName + "' using " +
+ "PigStorage();");
+ InputOutputFileVisitor visitor = new InputOutputFileVisitor(lp, null, pig.getPigContext());
+ visitor.visit();
+ } catch (PlanValidationException e){
+ // Since output file is not present, validation should pass
+ // and not throw this exception.
+ fail("Store validation test failed.");
+ } finally {
+ Util.deleteFile(pig.getPigContext(), outputFileName);
+ }
+ }
+
+ @Test
+ public void testValidationFailure() throws Exception{
+
+ String input[] = new String[] { "some data" };
+ String outputFileName = "test-output.txt";
+ boolean sawException = false;
+ try {
+ Util.createInputFile(pig.getPigContext(),outputFileName, input);
+ LogicalPlanTester lpt = new LogicalPlanTester(pig.getPigContext());
+ lpt.buildPlan("a = load '" + inputFileName + "' as (c:chararray, " +
+ "i:int,d:double);");
+ LogicalPlan lp = lpt.buildPlan("store a into '" + outputFileName +
+ "' using PigStorage();");
+ InputOutputFileVisitor visitor = new InputOutputFileVisitor(lp,
+ new CompilationMessageCollector(), pig.getPigContext());
+ visitor.visit();
+ } catch (PlanValidationException pve){
+ // Since output file is present, validation should fail
+ // and throw this exception
+ assertEquals(6000,pve.getErrorCode());
+ assertEquals(PigException.REMOTE_ENVIRONMENT, pve.getErrorSource());
+ assertTrue(pve.getCause() instanceof IOException);
+ sawException = true;
+ } finally {
+ assertTrue(sawException);
+ Util.deleteFile(pig.getPigContext(), outputFileName);
+ }
+ }
+
+ @Test
public void testStore() throws Exception {
inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
storeAndCopyLocally(inpDB);
@@ -402,7 +457,6 @@
public void storeStatistics(ResourceStatistics stats, String location,
Configuration conf) throws IOException {
}
-
}
private void checkStorePath(String orig, String expected) throws Exception {
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java Wed Feb 17 18:22:45 2010
@@ -59,11 +59,17 @@
private Map<OperatorKey, LogicalOperator> logicalOpTable = null ;
private Map<String, LogicalOperator> aliasOp = null ;
private Map<String, String> fileNameMap = null ;
+ private PigContext pigContext;
public LogicalPlanTester() {
- reset() ;
+ this(new PigContext(ExecType.MAPREDUCE, new Properties()));
}
+ public LogicalPlanTester(PigContext pc) {
+ pigContext = pc;
+ reset() ;
+ }
+
/***
* Reset state
*/
@@ -203,7 +209,6 @@
private LogicalPlan buildPlan(String query, ClassLoader cldr) {
LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ;
- PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
try {
pigContext.connect();
} catch (ExecException e1) {
@@ -252,7 +257,7 @@
private LogicalPlan buildPlanThrowExceptionOnError (String query, ClassLoader cldr) throws IOException, ParseException {
LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ;
- PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
+
try {
pigContext.connect();
} catch (ExecException e1) {