You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/17 19:22:45 UTC

svn commit: r911135 - in /hadoop/pig/branches/load-store-redesign: ./ src/org/apache/pig/builtin/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/pig/test/ test/org/apache/pig/test/utils/

Author: pradeepkth
Date: Wed Feb 17 18:22:45 2010
New Revision: 911135

URL: http://svn.apache.org/viewvc?rev=911135&view=rev
Log:
PIG-1216: New load store design does not allow Pig to validate inputs and outputs up front (ashutoshc via pradeepkth)

Modified:
    hadoop/pig/branches/load-store-redesign/CHANGES.txt
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java

Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Wed Feb 17 18:22:45 2010
@@ -121,6 +121,9 @@
 
 BUG FIXES
 
+PIG-1216: New load store design does not allow Pig to validate inputs and
+outputs up front (ashutoshc via pradeepkth)
+
 PIG-1239: PigContext.connect() should not create a jobClient and jobClient
 should be created on demand when needed (pradeepkth)
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Wed Feb 17 18:22:45 2010
@@ -42,7 +42,6 @@
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.ResourceSchema;
@@ -63,7 +62,6 @@
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.util.LogUtils;
 
 public class BinStorage extends FileInputLoadFunc 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java Wed Feb 17 18:22:45 2010
@@ -21,9 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
@@ -50,9 +48,7 @@
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.StorageUtil;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java Wed Feb 17 18:22:45 2010
@@ -19,10 +19,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.StoreFunc; 
@@ -33,8 +30,6 @@
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.util.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java Wed Feb 17 18:22:45 2010
@@ -19,22 +19,19 @@
 
 import java.io.IOException;
 
-import org.apache.pig.ExecType; 
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigException;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.impl.PigContext ;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LOVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.PlanWalker;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.impl.plan.PlanValidationException;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 
 /***
  * Visitor for checking input/output files
@@ -56,85 +53,52 @@
         super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
         pigCtx = pigContext ;
         msgCollector = messageCollector ;
+        
     }
    
     /***
-     * The logic here is just to check that the file(s) do not exist
+     * The logic here is to delegate the validation of output specification
+     * to output format implementation.
      */
     @Override
     protected void visit(LOStore store) throws PlanValidationException{
-        // make sure that the file doesn't exist
-        String filename = store.getOutputFile().getFileName() ;
+
+        StoreFunc sf = store.getStoreFunc();
+        String outLoc = store.getOutputFile().getFileName();
+        Job dummyJob;
+        String errMsg = "Unexpected error. Could not validate the output " +
+        		"specification for: "+outLoc;
+        int errCode = 2116;
         
         try {
-            if (checkFileExists(filename)) {
-                byte errSrc = pigCtx.getErrorSource();
-                int errCode = 0;
-                switch(errSrc) {
-                case PigException.BUG:
-                    errCode = 2002;
-                    break;
-                case PigException.REMOTE_ENVIRONMENT:
-                    errCode = 6000;
-                    break;
-                case PigException.USER_ENVIRONMENT:
-                    errCode = 4000;
-                    break;
-                }
-                String msg = "The output file(s): " + filename 
-                + " already exists";
-                msgCollector.collect(msg, MessageType.Error) ;
-                throw new PlanValidationException(msg, errCode, errSrc);                
-            }
-        } catch (PlanValidationException pve) {
-            throw pve;
+            dummyJob = new Job(ConfigurationUtil.toConfiguration(pigCtx.getProperties()));
+            sf.setStoreLocation(outLoc, dummyJob);
+        } catch (IOException ioe) {
+            msgCollector.collect(errMsg, MessageType.Error) ;
+            throw new PlanValidationException(errMsg, errCode, pigCtx.getErrorSource(), ioe);
+        }
+        try {
+            sf.getOutputFormat().checkOutputSpecs(dummyJob);
         } catch (IOException ioe) {
             byte errSrc = pigCtx.getErrorSource();
-            int errCode = 0;
+            errCode = 0;
             switch(errSrc) {
             case PigException.BUG:
-                errCode = 2003;
+                errCode = 2002;
                 break;
             case PigException.REMOTE_ENVIRONMENT:
-                errCode = 6001;
+                errCode = 6000;
                 break;
             case PigException.USER_ENVIRONMENT:
-                errCode = 4001;
+                errCode = 4000;
                 break;
             }
-
-            String msg = "Cannot read from the storage where the output " 
-                    + filename + " will be stored ";
-            msgCollector.collect(msg, MessageType.Error) ;
-            throw new PlanValidationException(msg, errCode, errSrc, ioe);
-        } catch (Exception e) {
-            int errCode = 2116;
-            String msg = "Unexpected error. Could not check for the existence of the file(s): " + filename;
-            msgCollector.collect(msg, MessageType.Error) ;
-            throw new PlanValidationException(msg, errCode, PigException.BUG, e);
+            errMsg = "Output specification is invalid: "+outLoc;
+            msgCollector.collect(errMsg, MessageType.Error) ;
+            throw new PlanValidationException(errMsg, errCode, errSrc, ioe);
+        } catch (InterruptedException ie) {
+            msgCollector.collect(errMsg, MessageType.Error) ;
+            throw new PlanValidationException(errMsg, errCode, pigCtx.getErrorSource(), ie);
         }
     }
-
-    /***
-     * Check if the file(s) exist. There are two cases :-
-     * 1) Exact match
-     * 2) Globbing match
-     * TODO: Add globbing support in local execution engine 
-     * and then make this check for local FS support too
-     */
-    private boolean checkFileExists(String filename) throws IOException {
-        if (pigCtx.getExecType() == ExecType.LOCAL) {
-            ElementDescriptor elem = pigCtx.getLfs().asElement(filename) ;
-            return elem.exists() ;
-        }
-        else if (pigCtx.getExecType() == ExecType.MAPREDUCE) {
-            // TODO: Have to put the staging from local to HDFS somewhere else
-            // This does actual file check + glob check
-            return FileLocalizer.fileExists(filename, pigCtx) ;
-        }
-        else { // if ExecType is something else) 
-            throw new RuntimeException("Undefined state in " + this.getClass()) ;
-        }
-    }
-
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java Wed Feb 17 18:22:45 2010
@@ -78,10 +78,10 @@
                 LoadFunc.getAbsolutePath(absPath, curHdfsDir));      
     }
     
-    @Test(expected=FrontendException.class)
+    @Test
     public void testGetAbsolutePath4() throws IOException {
-        // test case: incompatible schemes
-        Assert.assertEquals("hdfs://myhost:123455/data/passwd",
+        // test case: non dfs scheme
+        Assert.assertEquals("http://myhost:12345/data/passwd",
                 LoadFunc.getAbsolutePath("http://myhost:12345/data/passwd", 
                 curHdfsDir));      
     }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Wed Feb 17 18:22:45 2010
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
@@ -63,9 +64,13 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.validators.InputOutputFileVisitor;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanValidationException;
 import org.apache.pig.pen.physicalOperators.POCounter;
 import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.LogicalPlanTester;
 import org.apache.pig.test.utils.TestHelper;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.After;
@@ -125,6 +130,56 @@
     }
 
     @Test
+    public void testValidation() throws Exception{
+        
+        String outputFileName = "test-output.txt";
+        try {
+            LogicalPlanTester lpt = new LogicalPlanTester();
+            lpt.buildPlan("a = load '" + inputFileName + "' as (c:chararray, " +
+                    "i:int,d:double);");
+            LogicalPlan lp = lpt.buildPlan("store a into '" + outputFileName + "' using " +
+                    "PigStorage();");
+            InputOutputFileVisitor visitor = new InputOutputFileVisitor(lp, null, pig.getPigContext());
+            visitor.visit();
+        } catch (PlanValidationException e){
+                // Since output file is not present, validation should pass
+                // and not throw this exception.
+                fail("Store validation test failed.");                
+        } finally {
+            Util.deleteFile(pig.getPigContext(), outputFileName);
+        }
+    }
+    
+    @Test
+    public void testValidationFailure() throws Exception{
+        
+        String input[] = new String[] { "some data" };
+        String outputFileName = "test-output.txt";
+        boolean sawException = false;
+        try {
+            Util.createInputFile(pig.getPigContext(),outputFileName, input);
+            LogicalPlanTester lpt = new LogicalPlanTester(pig.getPigContext());
+            lpt.buildPlan("a = load '" + inputFileName + "' as (c:chararray, " +
+                    "i:int,d:double);");
+            LogicalPlan lp = lpt.buildPlan("store a into '" + outputFileName + 
+                    "' using PigStorage();");
+            InputOutputFileVisitor visitor = new InputOutputFileVisitor(lp, 
+                    new CompilationMessageCollector(), pig.getPigContext());
+            visitor.visit();
+        } catch (PlanValidationException pve){
+            // Since output file is present, validation should fail
+            // and throw this exception 
+            assertEquals(6000,pve.getErrorCode());
+            assertEquals(PigException.REMOTE_ENVIRONMENT, pve.getErrorSource());
+            assertTrue(pve.getCause() instanceof IOException);
+            sawException = true;
+        } finally {
+            assertTrue(sawException);
+            Util.deleteFile(pig.getPigContext(), outputFileName);
+        }
+    }
+    
+    @Test
     public void testStore() throws Exception {
         inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
         storeAndCopyLocally(inpDB);
@@ -402,7 +457,6 @@
         public void storeStatistics(ResourceStatistics stats, String location,
                 Configuration conf) throws IOException {
         }
-        
     }
     
     private void checkStorePath(String orig, String expected) throws Exception {

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=911135&r1=911134&r2=911135&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LogicalPlanTester.java Wed Feb 17 18:22:45 2010
@@ -59,11 +59,17 @@
     private Map<OperatorKey, LogicalOperator> logicalOpTable = null ;
     private Map<String, LogicalOperator> aliasOp = null ;
     private Map<String, String> fileNameMap = null ;
+    private PigContext pigContext;
 
     public LogicalPlanTester() {
-        reset() ;
+        this(new PigContext(ExecType.MAPREDUCE, new Properties()));
     }
 
+    public LogicalPlanTester(PigContext pc) {
+        pigContext = pc;
+        reset() ;
+    }
+    
     /***
      * Reset state
      */
@@ -203,7 +209,6 @@
     private LogicalPlan buildPlan(String query, ClassLoader cldr) {
 
         LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ;
-        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
         try {
             pigContext.connect();
         } catch (ExecException e1) {
@@ -252,7 +257,7 @@
     private LogicalPlan buildPlanThrowExceptionOnError (String query, ClassLoader cldr) throws IOException, ParseException {
 
         LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ;
-        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
+        
         try {
             pigContext.connect();
         } catch (ExecException e1) {