You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/03/02 02:01:52 UTC

svn commit: r917827 [1/2] - in /hadoop/pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/zebra/src/java/org/apache/hadoop/zebra/pig/ contrib/zebra/src/test/org/apache/hadoop/zebra/pig/ src/org/apache/pig/ src/...

Author: pradeepkth
Date: Tue Mar  2 01:01:51 2010
New Revision: 917827

URL: http://svn.apache.org/viewvc?rev=917827&view=rev
Log:
PIG-1265: Change LoadMetadata and StoreMetadata to use Job instead of Configuraiton and add a cleanupOnFailure method to StoreFuncInterface (pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
    hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
    hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java
    hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
    hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Mar  2 01:01:51 2010
@@ -22,6 +22,10 @@
 
 INCOMPATIBLE CHANGES
 
+PIG-1265: Change LoadMetadata and StoreMetadata to use Job instead of
+Configuraiton and add a cleanupOnFailure method to StoreFuncInterface
+(pradeepkth)
+
 PIG-1250:  Make StoreFunc an abstract class and create a mirror interface
 called StoreFuncInterface (pradeepkth)
 

Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java Tue Mar  2 01:01:51 2010
@@ -27,6 +27,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.StoreMetadata;
@@ -131,7 +132,7 @@
     // Implementation of LoadMetaData interface
     
     @Override
-    public String[] getPartitionKeys(String location, Configuration conf) {
+    public String[] getPartitionKeys(String location, Job job) {
         return null;
     }
 
@@ -147,7 +148,8 @@
      * TODO location and conf params are ignored in favor of initialzation data 
      */
     @Override
-    public ResourceSchema getSchema(String location, Configuration conf) throws IOException {     
+    public ResourceSchema getSchema(String location, Job job) throws IOException {     
+        Configuration conf = job.getConfiguration();
         Set<ElementDescriptor> schemaFileSet = null;
         try {
             schemaFileSet = findMetaFile(location, schemaFileName, conf);
@@ -188,7 +190,8 @@
      * @see org.apache.pig.LoadMetadata#getStatistics(String, Configuration)
      */
     @Override
-    public ResourceStatistics getStatistics(String location, Configuration conf) throws IOException {
+    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
         Set<ElementDescriptor> statFileSet = null;
         try {
             statFileSet = findMetaFile(location, statFileName, conf);
@@ -224,7 +227,8 @@
     // Implementation of StoreMetaData interface
     
     @Override
-    public void storeStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException {
+    public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
         DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
         ElementDescriptor statFilePath = storage.asElement(location, statFileName);
         if(!statFilePath.exists() && stats != null) {
@@ -241,7 +245,8 @@
     }
 
     @Override
-    public void storeSchema(ResourceSchema schema, String location, Configuration conf) throws IOException {
+    public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
         DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
         ElementDescriptor schemaFilePath = storage.asElement(location, schemaFileName);
         if(!schemaFilePath.exists() && schema != null) {

Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java Tue Mar  2 01:01:51 2010
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.ResourceSchema;
@@ -55,13 +56,13 @@
     
     @Override
     public ResourceSchema getSchema(String location,
-            Configuration conf) throws IOException {
-        return (new JsonMetadata()).getSchema(location, conf);
+            Job job) throws IOException {
+        return (new JsonMetadata()).getSchema(location, job);
     }
 
     @Override
     public ResourceStatistics getStatistics(String location,
-            Configuration conf) throws IOException {        
+            Job job) throws IOException {        
         return null;
     }
 
@@ -71,7 +72,7 @@
     }
     
     @Override
-    public String[] getPartitionKeys(String location, Configuration conf)
+    public String[] getPartitionKeys(String location, Job job)
             throws IOException {
         return null;
     }
@@ -81,18 +82,18 @@
 
     @Override
     public void storeSchema(ResourceSchema schema, String location,
-            Configuration conf) throws IOException {
+            Job job) throws IOException {
         JsonMetadata metadataWriter = new JsonMetadata();
         byte fieldDel = '\t';
         byte recordDel = '\n';
         metadataWriter.setFieldDel(fieldDel);
         metadataWriter.setRecordDel(recordDel);
-        metadataWriter.storeSchema(schema, location, conf);               
+        metadataWriter.storeSchema(schema, location, job);               
     }
 
     @Override
     public void storeStatistics(ResourceStatistics stats, String location,
-            Configuration conf) throws IOException {
+            Job job) throws IOException {
         
     }
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Tue Mar  2 01:01:51 2010
@@ -280,15 +280,14 @@
      }
 
      @Override
-     public String[] getPartitionKeys(String location, Configuration conf)
+     public String[] getPartitionKeys(String location, Job job)
      throws IOException {
          return null;
      }
 
      @Override
-     public ResourceSchema getSchema(String location, Configuration conf) throws IOException {
-         Path[] paths = getPathsFromLocation( location, conf );
-         Job job = new Job(conf);
+     public ResourceSchema getSchema(String location, Job job) throws IOException {
+         Path[] paths = getPathsFromLocation( location, job.getConfiguration());
          TableInputFormat.setInputPaths( job, paths );
 
          Schema tableSchema = null;
@@ -325,7 +324,7 @@
      }
 
      @Override
-     public ResourceStatistics getStatistics(String location, Configuration conf)
+     public ResourceStatistics getStatistics(String location, Job job)
      throws IOException {
          // Statistics is not supported.
          return null;

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Tue Mar  2 01:01:51 2010
@@ -163,10 +163,11 @@
     }
 
     @Override
-    public void storeSchema(ResourceSchema schema, String location, Configuration conf)
+    public void storeSchema(ResourceSchema schema, String location, Job job)
     throws IOException {
     	// no-op. We do close at cleanupJob().
-        BasicTable.Writer write = new BasicTable.Writer( new Path( location ), conf );
+        BasicTable.Writer write = new BasicTable.Writer( new Path( location ), 
+                job.getConfiguration());
         write.close();
     }
 
@@ -177,7 +178,7 @@
 
     @Override
     public void storeStatistics(ResourceStatistics stats, String location,
-            Configuration conf) throws IOException {
+            Job job) throws IOException {
         // no-op
     }
 

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java Tue Mar  2 01:01:51 2010
@@ -437,15 +437,19 @@
       System.out.println(RowValue);
     }
 
-    Path newPath = new Path(getCurrentMethodName());
-    ExecJob pigJob = pigServer
-        .store(
-            "records",
-            new Path(newPath, "store").toString(),
-            TableStorer.class.getCanonicalName()
-                + "('[s7, s2]; [s3, s4]')");
-    Assert.assertNotNull(pigJob.getException());
-    System.out.println(pigJob.getException());
+    try {
+        Path newPath = new Path(getCurrentMethodName());
+        ExecJob pigJob = pigServer
+            .store(
+                "records",
+                new Path(newPath, "store").toString(),
+                TableStorer.class.getCanonicalName()
+                    + "('[s7, s2]; [s3, s4]')");
+    } catch (Exception e) {
+        System.out.println(e);
+        return;
+    }
+    Assert.fail("Exception expected");
   }
 
   @Test
@@ -462,16 +466,20 @@
       System.out.println(RowValue);
     }
 
-    Path newPath = new Path(getCurrentMethodName());
-    
-    ExecJob pigJob = pigServer
-          .store(
-              "records",
-              new Path(newPath, "store").toString(),
-              TableStorer.class.getCanonicalName()
-                  + "('[s1, s2]; [s1, s4]')");
-      Assert.assertNotNull(pigJob.getException());
-      System.out.println(pigJob.getException());
+    try {
+        Path newPath = new Path(getCurrentMethodName());
+        
+        ExecJob pigJob = pigServer
+              .store(
+                  "records",
+                  new Path(newPath, "store").toString(),
+                  TableStorer.class.getCanonicalName()
+                      + "('[s1, s2]; [s1, s4]')");
+    } catch(Exception e) {
+        System.out.println(e);
+        return;
+    }
+    Assert.fail("Exception expected");
   }
 
   @Test
@@ -487,17 +495,21 @@
       Tuple RowValue = it.next();
       System.out.println(RowValue);
     }
+    try{
+        Path newPath = new Path(getCurrentMethodName());
+    
+        ExecJob pigJob = pigServer
+            .store(
+                "records",
+                new Path(newPath, "store").toString(),
+                TableStorer.class.getCanonicalName()
+                    + "('[s1]; [s1]')");
+    } catch(Exception e) {
+        System.out.println(e);
+        return;
+    }
+    Assert.fail("Exception expected");
 
-    Path newPath = new Path(getCurrentMethodName());
-
-    ExecJob pigJob = pigServer
-        .store(
-            "records",
-            new Path(newPath, "store").toString(),
-            TableStorer.class.getCanonicalName()
-                + "('[s1]; [s1]')");
-    Assert.assertNotNull(pigJob.getException());
-    System.out.println(pigJob.getException());
   }
 
   // @Test
@@ -541,13 +553,17 @@
     // Use pig STORE to store testing data
     //
     System.out.println("path = " + path);
-    ExecJob pigJob = pigServer
-        .store(
-            "records",
-            path.toString(),
-            TableStorer.class.getCanonicalName()
-                + "('[s1, s2]; [s3, s4]')");
-    Assert.assertNotNull(pigJob.getException());
-    System.out.println("pig job exception : " + pigJob.getException());
+    try {
+        ExecJob pigJob = pigServer
+            .store(
+                "records",
+                path.toString(),
+                TableStorer.class.getCanonicalName()
+                    + "('[s1, s2]; [s3, s4]')");
+    } catch(Exception e) {
+        System.out.println(e);
+        return;
+    }
+    Assert.fail("Exception expected");
   }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java Tue Mar  2 01:01:51 2010
@@ -20,6 +20,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
  * This interface defines how to retrieve metadata related to data to be loaded.
@@ -32,14 +33,16 @@
      * Get a schema for the data to be loaded.  
      * @param location Location as returned by 
      * {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
-     * @param conf The {@link Configuration} object 
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information.  
      * @return schema for the data to be loaded. This schema should represent
      * all tuples of the returned data.  If the schema is unknown or it is
      * not possible to return a schema that represents all returned data,
      * then null should be returned.
      * @throws IOException if an exception occurs while determining the schema
      */
-    ResourceSchema getSchema(String location, Configuration conf) throws 
+    ResourceSchema getSchema(String location, Job job) throws 
     IOException;
 
     /**
@@ -47,31 +50,35 @@
      * available, then null should be returned.
      * @param location Location as returned by 
      * {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
-     * @param conf The {@link Configuration} object
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information.  
      * @return statistics about the data to be loaded.  If no statistics are
      * available, then null should be returned.
      * @throws IOException if an exception occurs while retrieving statistics
      */
-    ResourceStatistics getStatistics(String location, Configuration conf) 
+    ResourceStatistics getStatistics(String location, Job job) 
     throws IOException;
 
     /**
      * Find what columns are partition keys for this input.
      * @param location Location as returned by 
      * {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
-     * @param conf The {@link Configuration} object
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information.  
      * @return array of field names of the partition keys. Implementations 
      * should return null to indicate that there are no partition keys
      * @throws IOException if an exception occurs while retrieving partition keys
      */
-    String[] getPartitionKeys(String location, Configuration conf) 
+    String[] getPartitionKeys(String location, Job job) 
     throws IOException;
 
     /**
      * Set the filter for partitioning.  It is assumed that this filter
      * will only contain references to fields given as partition keys in
      * getPartitionKeys. So if the implementation returns null in 
-     * {@link #getPartitionKeys(String, Configuration)}, then this method is not
+     * {@link #getPartitionKeys(String, Job)}, then this method is not
      * called by pig runtime. This method is also not called by the pig runtime
      * if there are no partition filter conditions. 
      * @param partitionFilter that describes filter for partitioning

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue Mar  2 01:01:51 2010
@@ -36,6 +36,7 @@
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.pig.impl.plan.PlanException;
@@ -70,7 +71,9 @@
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -539,7 +542,7 @@
         }
 
         try {
-            LogicalPlan lp = compileLp(id);
+            LogicalPlan lp = clonePlan(id);
 
             // MRCompiler needs a store to be the leaf - hence
             // add a store to the plan to explain
@@ -557,7 +560,8 @@
                 }
             }
             
-            LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf, leaf.getAlias(), pigContext);
+            LogicalPlan unCompiledstorePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf, leaf.getAlias(), pigContext);
+            LogicalPlan storePlan = compileLp(unCompiledstorePlan, true);
             List<ExecJob> jobs = executeCompiledLogicalPlan(storePlan);
             if (jobs.size() < 1) {
                 throw new IOException("Couldn't retrieve job.");
@@ -813,8 +817,13 @@
         List<ExecJob> execJobs = pigContext.getExecutionEngine().execute(pp, "job_pigexec_");
         for (ExecJob execJob: execJobs) {
             if (execJob.getStatus()==ExecJob.JOB_STATUS.FAILED) {
-                FileLocalizer.triggerDeleteOnFail();
-                break;
+                POStore store = execJob.getPOStore();
+                try {
+                    store.getStoreFunc().cleanupOnFailure(store.getSFile().getFileName(),
+                            new Job(ConfigurationUtil.toConfiguration(execJob.getConfiguration())));
+                } catch (IOException e) {
+                    throw new ExecException(e);
+                }
             }
         }
         return execJobs;
@@ -841,39 +850,21 @@
             String msg = "Unable to clone plan before compiling";
             throw new FrontendException(msg, errCode, PigException.BUG, e);
         }
-        
+        return compileLp(lpClone, optimize);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private  LogicalPlan compileLp(LogicalPlan lp, boolean optimize) throws
+    FrontendException {
         // Set the logical plan values correctly in all the operators
-        PlanSetter ps = new PlanSetter(lpClone);
+        PlanSetter ps = new PlanSetter(lp);
         ps.visit();
         
-        SortInfoSetter sortInfoSetter = new SortInfoSetter(lpClone);
-        sortInfoSetter.visit();
-        
         // run through validator
         CompilationMessageCollector collector = new CompilationMessageCollector() ;
-        FrontendException caught = null;
-        try {
-            LogicalPlanValidationExecutor validator = 
-                new LogicalPlanValidationExecutor(lpClone, pigContext);
-            validator.validate(lpClone, collector);
-        } catch (FrontendException fe) {
-            // Need to go through and see what the collector has in it.  But
-            // remember what we've caught so we can wrap it into what we
-            // throw.
-            caught = fe;            
-        }
-        
-        if(aggregateWarning) {
-            CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
-        } else {
-            for(Enum type: MessageType.values()) {
-                CompilationMessageCollector.logAllMessages(collector, log);
-            }
-        }
+        boolean isBeforeOptimizer = true;
+        validate(lp, collector, isBeforeOptimizer);
         
-        if (caught != null) {
-            throw caught;
-        }
 
         // optimize
         if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("false")) {
@@ -888,11 +879,19 @@
                 throw new FrontendException(msg, errCode, PigException.BUG, ioe);
             }
 
-            LogicalOptimizer optimizer = new LogicalOptimizer(lpClone, pigContext.getExecType(), optimizerRules);
+            LogicalOptimizer optimizer = new LogicalOptimizer(lp, pigContext.getExecType(), optimizerRules);
             optimizer.optimize();
         }
 
-        return lpClone;
+        // compute whether output data is sorted or not
+        SortInfoSetter sortInfoSetter = new SortInfoSetter(lp);
+        sortInfoSetter.visit();
+        
+        // run validations to be done after optimization
+        isBeforeOptimizer = false;
+        validate(lp, collector, isBeforeOptimizer);
+        
+        return lp;
     }
 
     private PhysicalPlan compilePp(LogicalPlan lp) throws ExecException {
@@ -904,6 +903,32 @@
         return pp;
     }
 
+    private void validate(LogicalPlan lp, CompilationMessageCollector collector,
+            boolean isBeforeOptimizer) throws FrontendException {
+        FrontendException caught = null;
+        try {
+            LogicalPlanValidationExecutor validator = 
+                new LogicalPlanValidationExecutor(lp, pigContext, isBeforeOptimizer);
+            validator.validate(lp, collector);
+        } catch (FrontendException fe) {
+            // Need to go through and see what the collector has in it.  But
+            // remember what we've caught so we can wrap it into what we
+            // throw.
+            caught = fe;            
+        }
+        
+        if(aggregateWarning) {
+            CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
+        } else {
+            for(Enum type: MessageType.values()) {
+                CompilationMessageCollector.logAllMessages(collector, log);
+            }
+        }
+        
+        if (caught != null) {
+            throw caught;
+        }
+    }
     private LogicalPlan getPlanFromAlias(
             String alias,
             String operation) throws FrontendException {
@@ -1128,6 +1153,7 @@
             }
         }
 
+        @Override
         protected Graph clone() {
             // There are two choices on how we clone the logical plan
             // 1 - we really clone each operator and connect up the cloned operators

Modified: hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java Tue Mar  2 01:01:51 2010
@@ -19,12 +19,12 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.UDFContext;
 
 
@@ -54,6 +54,7 @@
      * @throws IOException 
      * @throws IOException if the conversion is not possible
      */
+    @Override
     public String relToAbsPathForStoreLocation(String location, Path curDir) 
     throws IOException {
         return LoadFunc.getAbsolutePath(location, curDir);
@@ -71,12 +72,9 @@
     public abstract OutputFormat getOutputFormat() throws IOException;
 
     /**
-     * Communicate to the store function the location used in Pig Latin to refer 
-     * to the object(s) being stored.  That is, if the PL script is
-     * <b>store A into 'bla'</b>
-     * then 'bla' is the location.  This location should be either a file name
-     * or a URI.  If it does not have a URI scheme Pig will assume it is a 
-     * filename.  
+     * Communicate to the storer the location where the data needs to be stored.  
+     * The location string passed to the {@link StoreFunc} here is the 
+     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
      * This method will be called in the frontend and backend multiple times. Implementations
      * should bear in mind that this method is called multiple times and should
      * ensure there are no inconsistent side effects due to the multiple calls.
@@ -84,7 +82,8 @@
      * {@link #setStoreLocation(String, Job)}.
      * 
 
-     * @param location Location indicated in store statement.
+     * @param location Location returned by 
+     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
      * @param job The {@link Job} object
      * @throws IOException if the location is not valid.
      */
@@ -102,6 +101,7 @@
      * @throws IOException if this schema is not acceptable.  It should include
      * a detailed error message indicating what is wrong with the schema.
      */
+    @Override
     public void checkSchema(ResourceSchema s) throws IOException {
         // default implementation is a no-op
     }
@@ -131,7 +131,41 @@
      * will be called before other methods in {@link StoreFunc}.
      * @param signature a unique signature to identify this StoreFunc
      */
+    @Override
     public void setStoreFuncUDFContextSignature(String signature) {
         // default implementation is a no-op
     }
+    
+    /**
+     * This method will be called by Pig if the job which contains this store
+     * fails. Implementations can clean up output locations in this method to
+     * ensure that no incorrect/incomplete results are left in the output location.
+     * The implementation in {@link StoreFunc} deletes the output location if it
+     * is a {@link FileSystem} location.
+     * @param location Location returned by 
+     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information. 
+     */
+    @Override
+    public void cleanupOnFailure(String location, Job job) 
+    throws IOException {
+        cleanupOnFailureImpl(location, job);
+    }
+    
+    /**
+     * Implementation for {@link #cleanupOnFailure(String, Job)}
+     * @param location
+     * @param job
+     * @throws IOException
+     */
+    public static void cleanupOnFailureImpl(String location, Job job) 
+    throws IOException {
+        FileSystem fs = FileSystem.get(job.getConfiguration());
+        Path path = new Path(location);
+        if(fs.exists(path)){
+            fs.delete(path, true);
+        }    
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java Tue Mar  2 01:01:51 2010
@@ -68,18 +68,16 @@
     OutputFormat getOutputFormat() throws IOException;
 
     /**
-     * Communicate to the store function the location used in Pig Latin to refer 
-     * to the object(s) being stored.  That is, if the PL script is
-     * <b>store A into 'bla'</b>
-     * then 'bla' is the location.  This location should be either a file name
-     * or a URI.  If it does not have a URI scheme Pig will assume it is a 
-     * filename.  
+     * Communicate to the storer the location where the data needs to be stored.  
+     * The location string passed to the {@link StoreFuncInterface} here is the 
+     * return value of {@link StoreFuncInterface#relToAbsPathForStoreLocation(String, Path)}
      * This method will be called in the frontend and backend multiple times. Implementations
      * should bear in mind that this method is called multiple times and should
      * ensure there are no inconsistent side effects due to the multiple calls.
      * 
 
-     * @param location Location indicated in store statement.
+     * @param location Location returned by 
+     * {@link StoreFuncInterface#relToAbsPathForStoreLocation(String, Path)}
      * @param job The {@link Job} object
      * @throws IOException if the location is not valid.
      */
@@ -124,4 +122,16 @@
      * @param signature a unique signature to identify this StoreFuncInterface
      */
     public void setStoreFuncUDFContextSignature(String signature);
+
+    /**
+     * This method will be called by Pig if the job which contains this store
+     * fails. Implementations can clean up output locations in this method to
+     * ensure that no incorrect/incomplete results are left in the output location
+     * @param location Location returned by 
+     * {@link StoreFuncInterface#relToAbsPathForStoreLocation(String, Path)}
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information. 
+     */
+    void cleanupOnFailure(String location, Job job) throws IOException;
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java Tue Mar  2 01:01:51 2010
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 
@@ -34,15 +35,19 @@
 
     /**
      * Store statistics about the data being written.
-     * 
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information.  
      * @throws IOException 
      */
-    void storeStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException;
+    void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException;
 
     /**
      * Store schema of the data being written
-     * 
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information.  
      * @throws IOException 
      */
-    void storeSchema(ResourceSchema schema, String location, Configuration conf) throws IOException;
+    void storeSchema(ResourceSchema schema, String location, Job job) throws IOException;
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java Tue Mar  2 01:01:51 2010
@@ -22,6 +22,7 @@
 import java.util.Properties;
 import java.io.OutputStream;
 
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.tools.pigstats.PigStats;
 
@@ -80,6 +81,12 @@
      * @return statistics relevant to the execution engine
      */
     public PigStats getStatistics();
+    
+    /**
+     * 
+     * @return {@link POStore} object associated with the store 
+     */
+    public POStore getPOStore();
 
     /**
      * hook for asynchronous notification of job completion pushed from the back-end

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Mar  2 01:01:51 2010
@@ -306,14 +306,16 @@
         try {
             PigStats stats = launcher.launchPig(plan, jobName, pigContext);
 
-            for (FileSpec spec: launcher.getSucceededFiles()) {
+            for (POStore store: launcher.getSucceededFiles()) {
+                FileSpec spec = store.getSFile();
                 String alias = leafMap.containsKey(spec.toString()) ? leafMap.get(spec.toString()).getAlias() : null;
-                jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec, alias, stats));
+                jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, store, alias, stats));
             }
 
-            for (FileSpec spec: launcher.getFailedFiles()) {
+            for (POStore store: launcher.getFailedFiles()) {
+                FileSpec spec = store.getSFile();
                 String alias = leafMap.containsKey(spec.toString()) ? leafMap.get(spec.toString()).getAlias() : null;
-                HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, spec, alias, stats);
+                HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, store, alias, stats);
                 j.setException(launcher.getError(spec));
                 jobs.add(j);
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Tue Mar  2 01:01:51 2010
@@ -19,23 +19,20 @@
 package org.apache.pig.backend.hadoop.executionengine;
 
 import java.io.OutputStream;
-import java.io.InputStream;
 import java.util.Iterator;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.tools.pigstats.PigStats;
 
@@ -49,26 +46,29 @@
     protected FileSpec outFileSpec;
     protected Exception backendException;
     protected String alias;
+    protected POStore poStore;
     private PigStats stats;
     
     public HJob(JOB_STATUS status,
                 PigContext pigContext,
-                FileSpec outFileSpec,
+                POStore store,
                 String alias) {
         this.status = status;
         this.pigContext = pigContext;
-        this.outFileSpec = outFileSpec;
+        this.poStore = store;
+        this.outFileSpec = poStore.getSFile();
         this.alias = alias;
     }
     
     public HJob(JOB_STATUS status,
             PigContext pigContext,
-            FileSpec outFileSpec,
+            POStore store,
             String alias,
             PigStats stats) {
         this.status = status;
         this.pigContext = pigContext;
-        this.outFileSpec = outFileSpec;
+        this.poStore = store;
+        this.outFileSpec = poStore.getSFile();
         this.alias = alias;
         this.stats = stats;
     }
@@ -143,8 +143,7 @@
     }
 
     public Properties getConfiguration() {
-        Properties props = new Properties();
-        return props;
+        return pigContext.getProperties();
     }
 
     public PigStats getStatistics() {
@@ -184,4 +183,12 @@
     public String getAlias() throws ExecException {
         return alias;
     }
+
+    /**
+     * @return the poStore
+     */
+    @Override
+    public POStore getPOStore() {
+        return poStore;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Mar  2 01:01:51 2010
@@ -133,7 +133,6 @@
      */
     public static final String PIG_MAP_STORES = "pig.map.stores";
     public static final String PIG_REDUCE_STORES = "pig.reduce.stores";
-    public static final String PIG_STORE_FUNC = "pig.storeFunc";
     
     // A mapping of job to pair of store locations and tmp locations for that job
     private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
@@ -460,13 +459,6 @@
                 String outputPath = st.getSFile().getFileName();
                 FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
                 
-                // serialize the store func spec using ObjectSerializer
-                // ObjectSerializer.serialize() uses default java serialization
-                // and then further encodes the output so that control characters
-                // get encoded as regular characters. Otherwise any control characters
-                // in the store funcspec would break the job.xml which is created by
-                // hadoop from the jobconf.
-                conf.set(PIG_STORE_FUNC, ObjectSerializer.serialize(outputFuncSpec.toString()));
                 conf.set("pig.streaming.log.dir", 
                             new Path(outputPath, LOG_DIR).toString());
                 conf.set("pig.streaming.task.output.dir", outputPath);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Tue Mar  2 01:01:51 2010
@@ -20,17 +20,16 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
@@ -40,17 +39,13 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.tools.pigstats.PigStats;
 
 public abstract class Launcher {
@@ -62,8 +57,8 @@
     boolean outOfMemory = false;
     static final String OOM_ERR = "OutOfMemoryError";
 
-    protected List<FileSpec> succeededStores = null;
-    protected List<FileSpec> failedStores = null;
+    protected List<POStore> succeededStores = null;
+    protected List<POStore> failedStores = null;
     
     protected Launcher(){
         totalHadoopTimeSpent = 0;
@@ -75,21 +70,19 @@
     }
 
     /**
-     * Returns a list of locations of results that have been
-     * successfully completed.
-     * @return A list of filspecs that corresponds to the locations of
-     * the successful stores.
+     *
+     * @return A list of {@link POStore} objects corresponding to the store
+     * statements that were successful
      */
-    public List<FileSpec> getSucceededFiles() {
+    public List<POStore> getSucceededFiles() {
         return succeededStores;
     }
 
     /**
-     * Returns a list of locations of results that have failed.
-     * @return A list of filspecs that corresponds to the locations of
-     * the failed stores.
+     * @return A list of {@link POStore} objects corresponding to the store
+     * statements that failed
      */
-    public List<FileSpec> getFailedFiles() {
+    public List<POStore> getFailedFiles() {
         return failedStores;
     }
 
@@ -97,8 +90,8 @@
      * Resets the state after a launch
      */
     public void reset() {
-        succeededStores = new LinkedList<FileSpec>();
-        failedStores = new LinkedList<FileSpec>();
+        succeededStores = new LinkedList<POStore>();
+        failedStores = new LinkedList<POStore>();
     }
 
     /**

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Mar  2 01:01:51 2010
@@ -276,9 +276,8 @@
                         finalStores++;
                         log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
                     }
-                    failedStores.add(st.getSFile());
+                    failedStores.add(st);
                     failureMap.put(st.getSFile(), backendException);
-                    FileLocalizer.registerDeleteOnFail(st.getSFile().getFileName(), pc);
                     //log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
                 }
             }
@@ -304,7 +303,7 @@
                         storeSchema(job, st);
                     }
                     if (!st.isTmpStore()) {
-                        succeededStores.add(st.getSFile());
+                        succeededStores.add(st);
                         finalStores++;
                         log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
                     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Mar  2 01:01:51 2010
@@ -23,6 +23,7 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -129,9 +130,7 @@
         // call setLocation() on the storeFunc so that if there are any
         // side effects like setting map.output.dir on the Configuration
         // in the Context are needed by the OutputCommitter, those actions
-        // will be done before the committer is created. Also the String 
-        // version of StoreFunc for the specific store need
-        // to be set up in the context in case the committer needs them
+        // will be done before the committer is created. 
         PigOutputFormat.setLocation(contextCopy, store);
         return contextCopy;   
     }
@@ -161,14 +160,11 @@
             if (schema != null) {
                 ((StoreMetadata) storeFunc).storeSchema(
                         new ResourceSchema(schema, store.getSortInfo()), store.getSFile()
-                                .getFileName(), conf);
+                                .getFileName(), new Job(conf));
             }
         }
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapred.FileOutputCommitter#cleanupJob(org.apache.hadoop.mapred.JobContext)
-     */
     @Override
     public void cleanupJob(JobContext context) throws IOException {
         // call clean up on all map and reduce committers
@@ -188,9 +184,6 @@
        
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext)
-     */
     @Override
     public void abortTask(TaskAttemptContext context) throws IOException {        
         if(context.getTaskAttemptID().isMap()) {
@@ -210,9 +203,6 @@
         }
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext)
-     */
     @Override
     public void commitTask(TaskAttemptContext context) throws IOException {
         if(context.getTaskAttemptID().isMap()) {
@@ -232,9 +222,6 @@
         }
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext)
-     */
     @Override
     public boolean needsTaskCommit(TaskAttemptContext context)
             throws IOException {
@@ -260,9 +247,6 @@
         }
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#setupJob(org.apache.hadoop.mapreduce.JobContext)
-     */
     @Override
     public void setupJob(JobContext context) throws IOException {
         // call set up on all map and reduce committers
@@ -279,9 +263,6 @@
         }
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext)
-     */
     @Override
     public void setupTask(TaskAttemptContext context) throws IOException {
         if(context.getTaskAttemptID().isMap()) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Tue Mar  2 01:01:51 2010
@@ -178,12 +178,6 @@
         // supplied as input has the updates.
         ConfigurationUtil.mergeConf(jobContext.getConfiguration(), 
                 storeJob.getConfiguration());
-        
-        // Before delegating calls to underlying OutputFormat or OutputCommitter
-        // Pig needs to ensure the Configuration in the JobContext contains
-        // StoreFunc for the specific store - so set this up in the context 
-        // for this specific store
-        updateContextWithStoreInfo(jobContext, store);
     }
 
     @Override
@@ -240,21 +234,4 @@
         // will wrap the real OutputCommitter(s) belonging to the store(s)
         return new PigOutputCommitter(taskattemptcontext);
     }
-    
-    /**
-     * Before delegating calls to underlying OutputFormat or OutputCommitter
-     * Pig needs to ensure the Configuration in the {@link JobContext} contains
-     * {@link JobControlCompiler#PIG_STORE_FUNC}. This helper method can be
-     * used to set this up
-     * @param context the job context
-     * @param store the POStore whose information is to be put into the context
-     * @throws IOException in case of failure
-     */
-    public static void updateContextWithStoreInfo(JobContext context, 
-            POStore store) throws IOException {
-        Configuration conf = context.getConfiguration();
-        conf.set(JobControlCompiler.PIG_STORE_FUNC, 
-                store.getSFile().getFuncSpec().toString());
-        
-    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Tue Mar  2 01:01:51 2010
@@ -45,6 +45,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFuncInterface;
@@ -380,14 +381,15 @@
     }
 
     @Override
-    public String[] getPartitionKeys(String location, Configuration conf)
+    public String[] getPartitionKeys(String location, Job job)
             throws IOException {
         return null;
     }
 
     @Override
-    public ResourceSchema getSchema(String location, Configuration conf)
+    public ResourceSchema getSchema(String location, Job job)
             throws IOException {
+        Configuration conf = job.getConfiguration();
         Properties props = ConfigurationUtil.toProperties(conf);
         // since local mode now is implemented as hadoop's local mode
         // we can treat either local or hadoop mode as hadoop mode - hence
@@ -423,7 +425,7 @@
     }
 
     @Override
-    public ResourceStatistics getStatistics(String location, Configuration conf)
+    public ResourceStatistics getStatistics(String location, Job job)
             throws IOException {
         throw new UnsupportedOperationException();
     }
@@ -437,4 +439,9 @@
     public void setStoreFuncUDFContextSignature(String signature) {
     }
 
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+        StoreFunc.cleanupOnFailureImpl(location, job);
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Mar  2 01:01:51 2010
@@ -26,6 +26,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
@@ -45,6 +46,7 @@
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
@@ -135,7 +137,7 @@
       
     }
 
-    ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);
+    protected ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);
 
     @Override
     public void putNext(Tuple f) throws IOException {
@@ -293,4 +295,10 @@
     public void setStoreFuncUDFContextSignature(String signature) {
     }
 
+    @Override
+    public void cleanupOnFailure(String location, Job job)
+            throws IOException {
+        StoreFunc.cleanupOnFailureImpl(location, job);
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Mar  2 01:01:51 2010
@@ -47,6 +47,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 
 public class LOLoad extends RelationalOperator {
     private static final long serialVersionUID = 2L;
@@ -164,7 +165,7 @@
         if(LoadMetadata.class.isAssignableFrom(mLoadFunc.getClass())) {
             LoadMetadata loadMetadata = (LoadMetadata)mLoadFunc;
             ResourceSchema rSchema = loadMetadata.getSchema(
-                    mInputFileSpec.getFileName(), conf);
+                    mInputFileSpec.getFileName(), new Job(conf));
             return Schema.getPigSchema(rSchema);
         } else {
             return null;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Tue Mar  2 01:01:51 2010
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
@@ -128,7 +129,7 @@
         loadMetadata = (LoadMetadata)loadFunc;
         try {
             partitionKeys = loadMetadata.getPartitionKeys(
-                    loLoad.getInputFile().getFileName(), loLoad.getConfiguration());
+                    loLoad.getInputFile().getFileName(), new Job(loLoad.getConfiguration()));
             if(partitionKeys == null || partitionKeys.length == 0) {
                 return false;
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java Tue Mar  2 01:01:51 2010
@@ -56,19 +56,25 @@
      */
     
     public LogicalPlanValidationExecutor(LogicalPlan plan,
-                                         PigContext pigContext) {
+                                         PigContext pigContext, 
+                                         boolean beforeOptimizer) {
    
         // Default validations
-        if (!pigContext.inExplain) {
+        if (!pigContext.inExplain && !beforeOptimizer) {
             // When running explain we don't want to check for input
-            // files.
+            // files. - run this validator after optimizer for two reasons
+            // 1) input/output may get changed (optimized away)
+            // 2) we will call checkSchema on the StoreFunc for the store(s) 
+            // in this validator and the schema should contain correct schema 
+            // after optimization
             validatorList.add(new InputOutputFileValidator(pigContext)) ;
+        } else if (beforeOptimizer) {
+            // This one has to be done before the type checker.
+            //validatorList.add(new TypeCastInserterValidator()) ;
+            validatorList.add(new TypeCheckingValidator()) ;
+            
+            validatorList.add(new SchemaAliasValidator()) ;
         }
-        // This one has to be done before the type checker.
-        //validatorList.add(new TypeCastInserterValidator()) ;
-        validatorList.add(new TypeCheckingValidator()) ;
-        
-        validatorList.add(new SchemaAliasValidator()) ;
     }    
 
     public void validate(LogicalPlan plan,

Modified: hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java Tue Mar  2 01:01:51 2010
@@ -214,13 +214,19 @@
         CompilationMessageCollector collector = new CompilationMessageCollector();
         FrontendException caught = null;
         try {
+            boolean isBeforeOptimizer = true;
             LogicalPlanValidationExecutor validator = new LogicalPlanValidationExecutor(
-                    plan, pigContext);
+                    plan, pigContext, isBeforeOptimizer);
             validator.validate(plan, collector);
 
             FunctionalLogicalOptimizer optimizer = new FunctionalLogicalOptimizer(
                     plan);
             optimizer.optimize();
+            
+            isBeforeOptimizer = false;
+            validator = new LogicalPlanValidationExecutor(
+                    plan, pigContext, isBeforeOptimizer);
+            validator.validate(plan, collector);
         } catch (FrontendException fe) {
             // Need to go through and see what the collector has in it. But
             // remember what we've caught so we can wrap it into what we

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java Tue Mar  2 01:01:51 2010
@@ -18,13 +18,18 @@
 package org.apache.pig.test;
 
 import java.io.* ;
+import java.util.Iterator;
 import java.util.Properties;
 
 import org.apache.pig.ExecType; 
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
@@ -37,7 +42,10 @@
 import org.apache.pig.impl.logicalLayer.validators.* ;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType; 
+import org.apache.pig.impl.util.LogUtils;
 import org.junit.Test;
+
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 public class TestInputOutputFileValidator extends TestCase {
@@ -57,7 +65,8 @@
         LogicalPlan plan = genNewLoadStorePlan(inputfile, outputfile, ctx.getFs()) ;        
         
         CompilationMessageCollector collector = new CompilationMessageCollector() ;        
-        LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx) ;
+        boolean isBeforeOptimizer = false; // we are not optimizing in this testcase
+        LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx, isBeforeOptimizer) ;
         executor.validate(plan, collector) ;
         
         assertFalse(collector.hasError()) ;
@@ -77,7 +86,8 @@
         LogicalPlan plan = genNewLoadStorePlan(inputfile, outputfile, ctx.getDfs()) ;        
         
         CompilationMessageCollector collector = new CompilationMessageCollector() ;        
-        LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx) ;
+        boolean isBeforeOptimizer = false; // we are not optimizing in this testcase
+        LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx, isBeforeOptimizer) ;
         try {
             executor.validate(plan, collector) ;
             fail("Expected to fail.");
@@ -104,7 +114,8 @@
         LogicalPlan plan = genNewLoadStorePlan(inputfile, outputfile, ctx.getDfs()) ;                     
         
         CompilationMessageCollector collector = new CompilationMessageCollector() ;        
-        LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx) ;
+        boolean isBeforeOptimizer = false; // we are not optimizing in this testcase
+        LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx, isBeforeOptimizer) ;
         executor.validate(plan, collector) ;
             
         assertFalse(collector.hasError()) ;
@@ -123,7 +134,8 @@
         LogicalPlan plan = genNewLoadStorePlan(inputfile, outputfile, ctx.getDfs()) ;                     
         
         CompilationMessageCollector collector = new CompilationMessageCollector() ;        
-        LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx) ;
+        boolean isBeforeOptimizer = false; // we are not optimizing in this testcase
+        LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx, isBeforeOptimizer) ;
         try {
             executor.validate(plan, collector) ;
             fail("Excepted to fail.");
@@ -137,6 +149,93 @@
         }       
 
     }
+    
+    /**
+     * Testcase to ensure Input output validation allows store to a location
+     * that does not exist when using {@link PigServer#store(String, String)}
+     * @throws Exception
+     */
+    @Test
+    public void testPigServerStore() throws Exception {
+        String input = "input.txt";
+        String output= "output.txt";
+        String data[] = new String[] {"hello\tworld"};
+        ExecType[] modes = new ExecType[] {ExecType.MAPREDUCE, ExecType.LOCAL};
+        PigServer pig = null;
+        for (ExecType execType : modes) {
+            try {
+                if(execType == ExecType.MAPREDUCE) {
+                    pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+                } else {
+                    Properties props = new Properties();
+                    props.put(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                    pig = new PigServer(ExecType.LOCAL, props);
+                }
+                // reinitialize FileLocalizer for each mode
+                // this is need for the tmp file creation as part of
+                // PigServer.openIterator
+                FileLocalizer.setInitialized(false);
+                Util.deleteFile(pig.getPigContext(), input);
+                Util.deleteFile(pig.getPigContext(), output);
+                Util.createInputFile(pig.getPigContext(), input, data);
+                pig.registerQuery("a = load '" + input + "';");
+                pig.store("a", output);
+                pig.registerQuery("b = load '" + output + "';");
+                Iterator<Tuple> it = pig.openIterator("b");
+                Tuple t = it.next();
+                Assert.assertEquals("hello", t.get(0).toString());
+                Assert.assertEquals("world", t.get(1).toString());
+                Assert.assertEquals(false, it.hasNext());
+            } finally {
+                Util.deleteFile(pig.getPigContext(), input);
+                Util.deleteFile(pig.getPigContext(), output);
+            }
+        }
+    }
+    
+    /**
+     * Test case to test that Input output file validation catches the case
+     * where the output file exists when using 
+     * {@link PigServer#store(String, String)}
+     * @throws Exception
+     */
+    @Test
+    public void testPigServerStoreNeg() throws Exception {
+        String input = "input.txt";
+        String output= "output.txt";
+        String data[] = new String[] {"hello\tworld"};
+        ExecType[] modes = new ExecType[] {ExecType.MAPREDUCE, ExecType.LOCAL};
+        PigServer pig = null;
+        for (ExecType execType : modes) {
+            try {
+                boolean exceptionCaught = false;
+                if(execType == ExecType.MAPREDUCE) {
+                    pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+                } else {
+                    Properties props = new Properties();
+                    props.put(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                    pig = new PigServer(ExecType.LOCAL, props);
+                }
+                Util.deleteFile(pig.getPigContext(), input);
+                Util.deleteFile(pig.getPigContext(), output);
+                Util.createInputFile(pig.getPigContext(), input, data);
+                Util.createInputFile(pig.getPigContext(), output, data);
+                try {
+                    pig.registerQuery("a = load '" + input + "';");
+                    pig.store("a", output);
+                } catch (Exception e) {
+                    assertEquals(6000, LogUtils.getPigException(e).getErrorCode());
+                    exceptionCaught = true;
+                }
+                if(!exceptionCaught) {
+                    Assert.fail("Expected exception to be caught");
+                }
+            } finally {
+                Util.deleteFile(pig.getPigContext(), input);
+                Util.deleteFile(pig.getPigContext(), output);
+            }
+        }
+    }
         
     private LogicalPlan genNewLoadStorePlan(String inputFile,
                                             String outputFile, DataStorage dfs) 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java Tue Mar  2 01:01:51 2010
@@ -302,13 +302,19 @@
         CompilationMessageCollector collector = new CompilationMessageCollector();
         FrontendException caught = null;
         try {
+            boolean isBeforeOptimizer = true;
             LogicalPlanValidationExecutor validator = new LogicalPlanValidationExecutor(
-                    plan, pigContext);
+                    plan, pigContext, isBeforeOptimizer);
             validator.validate(plan, collector);
 
             FunctionalLogicalOptimizer optimizer = new FunctionalLogicalOptimizer(
                     plan);
             optimizer.optimize();
+            
+            isBeforeOptimizer = false;
+            validator = new LogicalPlanValidationExecutor(
+                    plan, pigContext, isBeforeOptimizer);
+            validator.validate(plan, collector);
         } catch (FrontendException fe) {
             // Need to go through and see what the collector has in it. But
             // remember what we've caught so we can wrap it into what we

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java Tue Mar  2 01:01:51 2010
@@ -505,20 +505,20 @@
         }
 
         @Override
-        public String[] getPartitionKeys(String location, Configuration conf)
+        public String[] getPartitionKeys(String location, Job job)
                 throws IOException {
             return partCols;
         }
 
         @Override
-        public ResourceSchema getSchema(String location, Configuration conf)
+        public ResourceSchema getSchema(String location, Job job)
                 throws IOException {
             return new ResourceSchema(schema);
         }
 
         @Override
         public ResourceStatistics getStatistics(String location,
-                Configuration conf) throws IOException {
+                Job job) throws IOException {
             return null;
         }