You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/01/15 00:17:43 UTC

svn commit: r899461 - in /hadoop/pig/branches/load-store-redesign: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/...

Author: daijy
Date: Thu Jan 14 23:17:42 2010
New Revision: 899461

URL: http://svn.apache.org/viewvc?rev=899461&view=rev
Log:
PIG-1090: Update sources to reflect recent changes in load-store interfaces--PIG-1090-12.patch

Modified:
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java Thu Jan 14 23:17:42 2010
@@ -524,7 +524,7 @@
                 }
             }
             
-            LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf);
+            LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf, id);
             List<ExecJob> jobs = executeCompiledLogicalPlan(storePlan);
             if (jobs.size() < 1) {
                 throw new IOException("Couldn't retrieve job.");
@@ -751,7 +751,7 @@
             }
             
             lp = QueryParser.generateStorePlan(scope, lp, "fakefile", 
-                                               PigStorage.class.getName(), leaf);
+                                               PigStorage.class.getName(), leaf, "fake");
         }
 
         return lp;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java Thu Jan 14 23:17:42 2010
@@ -18,6 +18,7 @@
 
 package org.apache.pig;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
@@ -25,8 +26,9 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
-public class ResourceSchema {
+public class ResourceSchema implements Serializable {
 
+    private static final long serialVersionUID = 1L;
     /* Array Getters intentionally return mutable arrays instead of copies,
      * to simplify updates without unnecessary copying.
      * Setters make a copy of the arrays in order to prevent an array
@@ -44,7 +46,8 @@
         
     private int version = 0;
 
-    public static class ResourceFieldSchema {
+    public static class ResourceFieldSchema implements Serializable {
+        private static final long serialVersionUID = 1L;
         private String name;
         
         // values are constants from DataType

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java Thu Jan 14 23:17:42 2010
@@ -72,9 +72,7 @@
      * <b>store A into 'bla'</b>
      * then 'bla' is the location.  This location should be either a file name
      * or a URI.  If it does not have a URI scheme Pig will assume it is a 
-     * filename.  This will be 
-     * called during planning on the front end, not during execution on
-     * the backend.
+     * filename.  This will be called multiple times during execution on the backend.
      * @param location Location indicated in store statement.
      * @param job The {@link Job} object
      * @throws IOException if the location is not valid.
@@ -83,7 +81,8 @@
  
     /**
      * Set the schema for data to be stored.  This will be called on the
-     * front end during planning. A Store function should implement this function to
+     * front end during planning if the store is associated with a schema.
+     * A Store function should implement this function to
      * check that a given schema is acceptable to it.  For example, it
      * can check that the correct partition keys are included;
      * a storage function to be written directly to an OutputFormat can
@@ -110,5 +109,13 @@
      * @throws IOException if an exception occurs during the write
      */
     void putNext(Tuple t) throws IOException;
-
+    
+    /**
+     * This method will be called by Pig both in the front end and back end to
+     * pass a unique signature to the {@link StoreFunc} which it can use to store
+     * information in the {@link UDFContext} which it needs to store between
+     * various method invocations in the front end and back end. 
+     * @param signature a unique signature to identify this StoreFunc
+     */
+    public void setStoreFuncUDFContextSignature(String signature);
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Jan 14 23:17:42 2010
@@ -33,24 +33,21 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -446,10 +443,18 @@
             
             for (POStore st: mapStores) {
                 storeLocations.add(st);
+                StoreFunc sFunc = st.getStoreFunc();
+                //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+                if (st.getSchema()!=null)
+                    sFunc.checkSchema(new ResourceSchema(st.getSchema()));
             }
 
             for (POStore st: reduceStores) {
                 storeLocations.add(st);
+                StoreFunc sFunc = st.getStoreFunc();
+                //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+                if (st.getSchema()!=null)
+                    sFunc.checkSchema(new ResourceSchema(st.getSchema()));
             }
 
             // the OutputFormat we report to Hadoop is always PigOutputFormat

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Thu Jan 14 23:17:42 2010
@@ -18,33 +18,13 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.SortInfo;
-import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.ObjectSerializer;
 /**
  * This class is used to have a POStore write to DFS via a output
  * collector/record writer. It sets up a modified job configuration to
@@ -78,10 +58,7 @@
     public StoreFunc createStoreFunc(POStore store) 
         throws IOException {
 
-        Configuration outputConf = context.getConfiguration();
-
         StoreFunc storeFunc = store.getStoreFunc();
-        Class outputFormatClass = storeFunc.getOutputFormat().getClass();
 
         // call the setStoreLocation on the storeFunc giving it the
         // Job. Typically this will result in the OutputFormat of the
@@ -92,8 +69,7 @@
         PigOutputFormat.setLocation(context, store);
         OutputFormat outputFormat = null;
         try {
-            outputFormat = (OutputFormat)ReflectionUtils.newInstance(
-                    outputFormatClass, outputConf);
+            outputFormat = (OutputFormat)storeFunc.getOutputFormat();
 
             // create a new record writer
             writer = outputFormat.getRecordWriter(context);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Thu Jan 14 23:17:42 2010
@@ -51,8 +51,6 @@
 public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> {
     
     private enum Mode { SINGLE_STORE, MULTI_STORE};
-    
-    private OutputCommitter committer;
 
     /** hadoop job output directory */
     public static final String MAPRED_OUTPUT_DIR = "mapred.output.dir";
@@ -67,6 +65,8 @@
      
     public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
                 throws IOException, InterruptedException {
+        // Setup UDFContext so in StoreFunc can make use of it
+        MapRedUtil.setupUDFContext(taskattemptcontext.getConfiguration());
         List<POStore> mapStores = getStores(taskattemptcontext, 
                 JobControlCompiler.PIG_MAP_STORES);
         List<POStore> reduceStores = getStores(taskattemptcontext, 
@@ -193,21 +193,19 @@
 
     @Override
     public void checkOutputSpecs(JobContext jobcontext) throws IOException, InterruptedException {
+        // Setup UDFContext so in StoreFunc can make use of it
+        MapRedUtil.setupUDFContext(jobcontext.getConfiguration());
         List<POStore> mapStores = getStores(jobcontext, 
                 JobControlCompiler.PIG_MAP_STORES);
         checkOutputSpecsHelper(mapStores, jobcontext);
         List<POStore> reduceStores = getStores(jobcontext, 
                 JobControlCompiler.PIG_REDUCE_STORES);
         checkOutputSpecsHelper(reduceStores, jobcontext);
-        
     }
 
     private void checkOutputSpecsHelper(List<POStore> stores, JobContext 
             jobcontext) throws IOException, InterruptedException {
         for (POStore store : stores) {
-            StoreFunc sFunc = store.getStoreFunc();
-            OutputFormat of = sFunc.getOutputFormat();
-            
             // make a copy of the original JobContext so that
             // each OutputFormat get a different copy 
             JobContext jobContextCopy = new JobContext(
@@ -216,6 +214,9 @@
             // set output location
             PigOutputFormat.setLocation(jobContextCopy, store);
             
+            StoreFunc sFunc = store.getStoreFunc();
+            OutputFormat of = sFunc.getOutputFormat();
+            
             // The above call should have update the conf in the JobContext
             // to have the output location - now call checkOutputSpecs()
             of.checkOutputSpecs(jobContextCopy);
@@ -237,6 +238,9 @@
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext 
             taskattemptcontext) throws IOException, InterruptedException {
+        // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside 
+        // construct of PigOutputCommitter, can make use of it
+        MapRedUtil.setupUDFContext(taskattemptcontext.getConfiguration());
         // we return an instance of PigOutputCommitter to Hadoop - this instance
         // will wrap the real OutputCommitter(s) belonging to the store(s)
         return new PigOutputCommitter(taskattemptcontext);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Jan 14 23:17:42 2010
@@ -1603,6 +1603,7 @@
         store.setAlias(loStore.getPlan().getPredecessors(loStore).get(0).getAlias());
         store.setSFile(loStore.getOutputFile());
         store.setInputSpec(loStore.getInputSpec());
+        store.setSignature(loStore.getSignature());
         try {
             // create a new schema for ourselves so that when
             // we serialize we are not serializing objects that

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Thu Jan 14 23:17:42 2010
@@ -38,7 +38,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Thu Jan 14 23:17:42 2010
@@ -70,6 +70,8 @@
     // column names and the asc/dsc info
     private SortInfo sortInfo;
     
+    private String signature;
+    
     public POStore(OperatorKey k) {
         this(k, -1, null);
     }
@@ -202,8 +204,9 @@
     }
     
     public StoreFunc getStoreFunc() {
-        return (StoreFunc) 
-        PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+        StoreFunc sFunc = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+        sFunc.setStoreFuncUDFContextSignature(signature);
+        return sFunc;
     }
     
     /**
@@ -219,4 +222,12 @@
     public SortInfo getSortInfo() {
         return sortInfo;
     }
+    
+    public String getSignature() {
+        return signature;
+    }
+    
+    public void setSignature(String signature) {
+        this.signature = signature;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Thu Jan 14 23:17:42 2010
@@ -433,4 +433,9 @@
     public void setPartitionFilter(Expression plan) throws IOException {
         throw new UnsupportedOperationException();
     }
+    
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+    }
+
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java Thu Jan 14 23:17:42 2010
@@ -297,4 +297,8 @@
         return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
     }
 
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+    }
+
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java Thu Jan 14 23:17:42 2010
@@ -46,6 +46,8 @@
     // FileSpec is set in PigServer.postProcess. It can be used to
     // reload this store, if the optimizer has the need.
     private FileSpec mInputSpec;
+    
+    private String signature;
 
     transient private StoreFunc mStoreFunc;
     private static Log log = LogFactory.getLog(LOStore.class);
@@ -59,7 +61,7 @@
      *            the file to be stored
      */
     public LOStore(LogicalPlan plan, OperatorKey key,
-            FileSpec outputFileSpec) throws IOException {
+            FileSpec outputFileSpec, String alias) throws IOException {
         super(plan, key);
 
         mOutputFile = outputFileSpec;
@@ -70,13 +72,28 @@
         // Also remove the commented out import org.apache.pig.impl.PigContext
 
         try { 
-             mStoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec()); 
+             mStoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
+             this.mAlias = alias;
+             this.signature = constructSignature(mAlias, mOutputFile.getFileName(), mOutputFile.getFuncSpec().getCtorArgs());
+             mStoreFunc.setStoreFuncUDFContextSignature(this.signature);
         } catch (Exception e) { 
             IOException ioe = new IOException(e.getMessage()); 
             ioe.setStackTrace(e.getStackTrace());
             throw ioe; 
         }
     }
+    
+    private String constructSignature(String alias, String filename, String[] args) {
+        String s = alias+"_"+filename+"_";
+        if (args!=null) {
+            for (int i=0;i<args.length;i++) {
+                s = s+args[i];
+                if (i!=args.length-1)
+                    s = s+"_";
+            }
+        }
+        return s;
+    }
 
     public FileSpec getOutputFile() {
         return mOutputFile;
@@ -198,5 +215,16 @@
         result.add(new RequiredFields(true));
         return result;
     }
-
+    
+    @Override
+    public void setAlias(String newAlias) {
+        super.setAlias(newAlias);
+        mStoreFunc.setStoreFuncUDFContextSignature(getAlias()+"_"+mOutputFile.getFileName());
+        signature = constructSignature(mAlias, mOutputFile.getFileName(), mOutputFile.getFuncSpec().getCtorArgs());
+        signature = getAlias()+"_"+mOutputFile.getFileName();
+    }
+    
+    public String getSignature() {
+        return signature;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Jan 14 23:17:42 2010
@@ -136,7 +136,8 @@
                                                 LogicalPlan readFrom,
                                                 String fileName,
                                                 String func,
-                                                LogicalOperator input) throws FrontendException {
+                                                LogicalOperator input,
+                                                String alias) throws FrontendException {
 
         if (func == null) {
             func = PigStorage.class.getName();
@@ -152,7 +153,8 @@
         try {
 		store = new LOStore(storePlan,
 							   new OperatorKey(scope, storeNodeId),
-                               new FileSpec(fileName, new FuncSpec(func)));
+                               new FileSpec(fileName, new FuncSpec(func)), 
+                               alias);
         } catch (IOException ioe) {
             throw new FrontendException(ioe.getMessage(), ioe);
         }
@@ -2417,7 +2419,7 @@
             fileNameMap.put(fileName, absolutePath);   
         }
         LogicalOperator store = new LOStore(lp, new OperatorKey(scope, getNextId()),
-                                            new FileSpec(absolutePath, funcSpec));
+                                            new FileSpec(absolutePath, funcSpec), t.image);
 
         LogicalOperator input = mapAliasOp.get(t.image);
         if (input == null)

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java Thu Jan 14 23:17:42 2010
@@ -95,7 +95,10 @@
         PigException pigException = LogUtils.getPigException(t);
 
         if(pigException != null) {
-            message = "ERROR " + pigException.getErrorCode() + ": " + pigException.getMessage();
+            if (pigException.getCause()!=null)
+                message = "ERROR " + pigException.getErrorCode() + ": " + pigException.getCause().getMessage();
+            else
+                message = "ERROR " + pigException.getErrorCode() + ": " + pigException.getMessage();
         } else {
             if((t instanceof ParseException 
                     || t instanceof org.apache.pig.tools.pigscript.parser.TokenMgrError 

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java Thu Jan 14 23:17:42 2010
@@ -151,7 +151,7 @@
             new FileSpec(outputFile, new FuncSpec("org.apache.pig.builtin.PigStorage"));
         LOLoad load = new LOLoad(plan, genNewOperatorKeyId(), filespec1, 
                 ConfigurationUtil.toConfiguration(dfs.getConfiguration())) ;       
-        LOStore store = new LOStore(plan, genNewOperatorKeyId(), filespec2) ;
+        LOStore store = new LOStore(plan, genNewOperatorKeyId(), filespec2, "new") ;
         
         plan.add(load) ;
         plan.add(store) ;

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Thu Jan 14 23:17:42 2010
@@ -2699,6 +2699,10 @@
             // TODO Auto-generated method stub
             
         }
+        
+        @Override
+        public void setStoreFuncUDFContextSignature(String signature) {
+        }
                 
     }