You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/04/30 00:23:56 UTC

svn commit: r769970 - in /hadoop/pig/branches/multiquery: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/execut...

Author: gates
Date: Wed Apr 29 22:23:55 2009
New Revision: 769970

URL: http://svn.apache.org/viewvc?rev=769970&view=rev
Log:
PIG-652 Adapt changes in store interface to multi-query changes.


Added:
    hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
Modified:
    hadoop/pig/branches/multiquery/CHANGES.txt
    hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Wed Apr 29 22:23:55 2009
@@ -615,3 +615,6 @@
     PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
 
 	PIG-627: multiquery support M3 (rding via gates)
+
+	PIG-652: Adapt changes in store interface to multi-query changes (hagleitn
+	via gates).

Added: hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java?rev=769970&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java Wed Apr 29 22:23:55 2009
@@ -0,0 +1,67 @@
+/**
+ * 
+ */
+package org.apache.pig;
+
+import java.io.Serializable;
+
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * A Class which will encapsulate metadata information that a
+ * OutputFormat (or possibly StoreFunc) may want to know
+ * about the data that needs to be stored.  
+ */
+public class StoreConfig implements Serializable {
+    
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    private String location;
+    private Schema schema;
+    
+    
+    /**
+     * @param location
+     * @param schema
+     */
+    public StoreConfig(String location, Schema schema) {
+        this.location = location;
+        this.schema = schema;
+    }
+    
+    /**
+     * @return the location
+     */
+    public String getLocation() {
+        return location;
+    }
+    /**
+     * @param location the location to set
+     */
+    public void setLocation(String location) {
+        this.location = location;
+    }
+    /**
+     * @return the schema
+     */
+    public Schema getSchema() {
+        return schema;
+    }
+    /**
+     * @param schema the schema to set
+     */
+    public void setSchema(Schema schema) {
+        this.schema = schema;
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        return "{location:" + location + ", schema:" + schema + "}";
+    }
+
+}

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java Wed Apr 29 22:23:55 2009
@@ -58,7 +58,24 @@
      * 
      * @throws IOException
      */
-    public abstract void finish() throws IOException;
+    public abstract void finish() throws IOException;
+    
+    /**
+     * Specify a backend specific class to use to prepare for
+     * storing output.  In the Hadoop case, this can return an
+     * OutputFormat that will be used instead of PigOutputFormat.  The 
+     * framework will call this function and if a Class is returned
+     * that implements OutputFormat it will be used. For more details on how
+     * the OutputFormat should interact with Pig, see 
+     * {@link org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, String, org.apache.hadoop.util.Progressable)}
+     * @return Backend specific class used to prepare for storing output.
+     * If the {@link StoreFunc} implementation does not have a class to prepare
+     * for storing output, it can return null and a default Pig implementation
+     * will be used to prepare for storing output.
+     * @throws IOException if the class does not implement the expected
+     * interface(s).
+     */
+    public Class getStorePreparationClass() throws IOException;
 
     
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Apr 29 22:23:55 2009
@@ -36,12 +36,15 @@
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
@@ -100,6 +103,8 @@
     PigContext pigContext;
     
     private final Log log = LogFactory.getLog(getClass());
+    
+    public static final String PIG_STORE_CONFIG = "pig.store.config";
 
     public static final String LOG_DIR = "_logs";
 
@@ -310,7 +315,6 @@
                                   "pig.streaming.cache.files", false);
 
             jobConf.setInputFormat(PigInputFormat.class);
-            jobConf.setOutputFormat(PigOutputFormat.class);
             
             //Process POStore and remove it from the plan
             List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
@@ -328,11 +332,37 @@
                     st = reduceStores.remove(0);
                     mro.reducePlan.remove(st);
                 }
+
+                // If the StoreFunc associate with the POStore is implements
+                // getStorePreparationClass() and returns a non null value,
+                // then it could be wanting to implement OutputFormat for writing out to hadoop
+                // Check if this is the case, if so, use the OutputFormat class the 
+                // StoreFunc gives us else use our default PigOutputFormat
+                Object storeFunc = PigContext.instantiateFuncFromSpec(st.getSFile().getFuncSpec());
+                Class sPrepClass = null;
+                try {
+                    sPrepClass = ((StoreFunc)storeFunc).getStorePreparationClass();
+                } catch(AbstractMethodError e) {
+                    // this is for backward compatibility wherein some old StoreFunc
+                    // which does not implement getStorePreparationClass() is being
+                    // used. In this case, we want to just use PigOutputFormat
+                    sPrepClass = null;
+                }
+                if(sPrepClass != null && OutputFormat.class.isAssignableFrom(sPrepClass)) {
+                    jobConf.setOutputFormat(sPrepClass);
+                } else {
+                    jobConf.setOutputFormat(PigOutputFormat.class);
+                }
+                
+                //set out filespecs
                 String outputPath = st.getSFile().getFileName();
                 FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
                 FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+             
                 jobConf.set("pig.storeFunc", outputFuncSpec.toString());
-                
+                jobConf.set(PIG_STORE_CONFIG, 
+                            ObjectSerializer.serialize(new StoreConfig(outputPath, st.getSchema())));
+
                 jobConf.set("pig.streaming.log.dir", 
                             new Path(outputPath, LOG_DIR).toString());
                 jobConf.set("pig.streaming.task.output.dir", outputPath);
@@ -349,6 +379,7 @@
                     fs.mkdirs(tmpOut);
                 }
 
+                jobConf.setOutputFormat(PigOutputFormat.class);
                 FileOutputFormat.setOutputPath(jobConf, curTmpPath);
 
                 jobConf.set("pig.streaming.log.dir", 

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Wed Apr 29 22:23:55 2009
@@ -36,10 +36,13 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
+import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
@@ -65,6 +68,7 @@
     private JobConf job;
 
     private final Log log = LogFactory.getLog(getClass());
+    public static final String PIG_STORE_CONFIG = "pig.store.config";
     
     public MapReducePOStoreImpl(JobConf job) {
         this.job = job;
@@ -75,14 +79,33 @@
     }
 
     @Override
-    public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+    public StoreFunc createStoreFunc(FileSpec sFile, Schema schema) 
+        throws IOException {
 
         // set up a new job conf
         JobConf outputConf = new JobConf(job);
         String tmpPath = PlanHelper.makeStoreTmpPath(sFile.getFileName());
 
-        // Right now we're always using PigOutputFormat.
-        outputConf.setOutputFormat(PigOutputFormat.class);
+        // If the StoreFunc associate with the POStore is implements
+        // getStorePreparationClass() and returns a non null value,
+        // then it could be wanting to implement OutputFormat for writing out to hadoop
+        // Check if this is the case, if so, use the OutputFormat class the 
+        // StoreFunc gives us else use our default PigOutputFormat
+        Object storeFunc = PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+        Class sPrepClass = null;
+        try {
+            sPrepClass = ((StoreFunc)storeFunc).getStorePreparationClass();
+        } catch(AbstractMethodError e) {
+            // this is for backward compatibility wherein some old StoreFunc
+            // which does not implement getStorePreparationClass() is being
+            // used. In this case, we want to just use PigOutputFormat
+            sPrepClass = null;
+        }
+        if(sPrepClass != null && OutputFormat.class.isAssignableFrom(sPrepClass)) {
+            outputConf.setOutputFormat(sPrepClass);
+        } else {
+            outputConf.setOutputFormat(PigOutputFormat.class);
+        }
 
         // PigOuputFormat will look for pig.storeFunc to actually
         // write stuff out.
@@ -94,6 +117,10 @@
         Path outputDir = new Path(sFile.getFileName()).makeQualified(FileSystem.get(outputConf));
         outputConf.set("mapred.output.dir", outputDir.toString());
 
+        // Set the schema
+        outputConf.set(PIG_STORE_CONFIG, 
+                       ObjectSerializer.serialize(new StoreConfig(outputDir.toString(), schema)));
+
         // The workpath is set to a unique-per-store subdirectory of
         // the current working directory.
         String workPath = outputConf.get("mapred.work.output.dir");
@@ -168,5 +195,10 @@
         @Override
         public void finish() throws IOException {
         }
+
+        @Override
+        public Class getStorePreparationClass() throws IOException {
+            return null;
+        }
     }
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Wed Apr 29 22:23:55 2009
@@ -33,6 +33,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -47,6 +48,24 @@
 public class PigOutputFormat implements OutputFormat<WritableComparable, Tuple> {
     public static final String PIG_OUTPUT_FUNC = "pig.output.func";
 
+    /**
+     * In general, the mechanism for an OutputFormat in Pig to get hold of the storeFunc
+     * and the metadata information (for now schema and location provided for the store in
+     * the pig script) is through the following Utility static methods:
+     * {@link org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil#getStoreFunc(JobConf)} 
+     * - this will get the {@link org.apache.pig.StoreFunc} reference to use in the RecordWriter.write()
+     * {@link MapRedUtil#getStoreConfig(JobConf)} - this will get the {@link org.apache.pig.StoreConfig}
+     * reference which has metadata like the location (the string supplied with store statement in the script)
+     * and the {@link org.apache.pig.impl.logicalLayer.schema.Schema} of the data. The OutputFormat
+     * should NOT use the location in the StoreConfig to write the output if the location represents a 
+     * Hadoop dfs path. This is because when "speculative execution" is turned on in Hadoop, multiple
+     * attempts for the same task (for a given partition) may be running at the same time. So using the
+     * location will mean that these different attempts will over-write each other's output.
+     * The OutputFormat should use 
+     * {@link org.apache.hadoop.mapred.FileOutputFormat#getWorkOutputPath(JobConf)}
+     * which will provide a safe output directory into which the OutputFormat should write
+     * the part file (given by the name argument in the getRecordWriter() call).
+     */
     public RecordWriter<WritableComparable, Tuple> getRecordWriter(FileSystem fs, JobConf job,
             String name, Progressable progress) throws IOException {
         Path outputDir = FileOutputFormat.getWorkOutputPath(job);
@@ -56,20 +75,7 @@
     public PigRecordWriter getRecordWriter(FileSystem fs, JobConf job,
             Path outputDir, String name, Progressable progress)
             throws IOException {
-        StoreFunc store;
-        String storeFunc = job.get("pig.storeFunc", "");
-        if (storeFunc.length() == 0) {
-            store = new PigStorage();
-        } else {
-            try {
-                store = (StoreFunc) PigContext
-                        .instantiateFuncFromSpec(storeFunc);
-            } catch (Exception e) {
-                int errCode = 2081;
-                String msg = "Unable to setup the store function.";
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
+        StoreFunc store = MapRedUtil.getStoreFunc(job);
 
         String parentName = FileOutputFormat.getOutputPath(job).getName();
         int suffixStart = parentName.lastIndexOf('.');

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Apr 29 22:23:55 2009
@@ -46,6 +46,7 @@
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DependencyOrderWalkerWOSeenChk;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -1197,6 +1198,20 @@
                 .getNextNodeId(scope)));
         store.setSFile(loStore.getOutputFile());
         store.setInputSpec(loStore.getInputSpec());
+        try {
+            // create a new schema for ourselves so that when
+            // we serialize we are not serializing objects that
+            // contain the schema - apparently Java tries to
+            // serialize the object containing the schema if
+            // we are trying to serialize the schema reference in
+            // the containing object. The schema here will be serialized
+            // in JobControlCompiler
+            store.setSchema(new Schema(loStore.getSchema()));
+        } catch (FrontendException e1) {
+            int errorCode = 1060;
+            String message = "Cannot resolve Store output schema";  
+            throw new VisitorException(message, errorCode, PigException.BUG, e1);    
+        }
         currentPlan.add(store);
         
         List<LogicalOperator> op = loStore.getPlan().getPredecessors(loStore); 

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Wed Apr 29 22:23:55 2009
@@ -30,6 +30,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -53,6 +54,7 @@
     private final Log log = LogFactory.getLog(getClass());
     private POStoreImpl impl;
     private FileSpec sFile;
+    private Schema schema;
 
     // flag to distinguish user stores from MRCompiler stores.
     private boolean isTmpStore;
@@ -81,7 +83,7 @@
     public void setUp() throws IOException{
         if (impl != null) {
             try{
-                storer = impl.createStoreFunc(sFile);
+                storer = impl.createStoreFunc(sFile, schema);
             }catch (IOException ioe) {
                 int errCode = 2081;
                 String msg = "Unable to setup the store function.";            
@@ -184,4 +186,12 @@
     public void setStoreImpl(POStoreImpl impl) {
         this.impl = impl;
     }
+
+    public void setSchema(Schema schema) {
+        this.schema = schema;
+    }
+    
+    public Schema getSchema() {
+        return schema;
+    }
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java Wed Apr 29 22:23:55 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 /**
  * This class is used to specify the actual behavior of the store
@@ -31,7 +32,8 @@
      * @param sFile - The file the store should write to
      * @throws IOException
      */
-    public abstract StoreFunc createStoreFunc(FileSpec sFile) throws IOException;
+    public abstract StoreFunc createStoreFunc(FileSpec sFile, Schema schema) 
+        throws IOException;
     
     /**
      * At the end of processing, the outputstream is closed

Added: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=769970&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Wed Apr 29 22:23:55 2009
@@ -0,0 +1,68 @@
+/**
+ * 
+ */
+package org.apache.pig.backend.hadoop.executionengine.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Progressable;
+import org.apache.pig.PigException;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/**
+ * A class of utility static methods to be used in the hadoop map reduce backend
+ */
+public class MapRedUtil {
+
+    /**
+     * This method is to be called from an 
+     * {@link org.apache.hadoop.mapred.OutputFormat#getRecordWriter(FileSystem ignored, JobConf job,
+                                     String name, Progressable progress)}
+     * method to obtain a reference to the {@link org.apache.pig.StoreFunc} object to be used by
+     * that OutputFormat to perform the write() operation
+     * @param conf the JobConf object
+     * @return the StoreFunc reference
+     * @throws ExecException
+     */
+    public static StoreFunc getStoreFunc(JobConf conf) throws ExecException {
+        StoreFunc store;
+        String storeFunc = conf.get("pig.storeFunc", "");
+        if (storeFunc.length() == 0) {
+            store = new PigStorage();
+        } else {
+            try {
+                store = (StoreFunc) PigContext
+                        .instantiateFuncFromSpec(storeFunc);
+            } catch (Exception e) {
+                int errCode = 2081;
+                String msg = "Unable to setup the store function.";
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+        return store;
+    }
+    
+    /**
+     * This method is to be called from an 
+     * {@link org.apache.hadoop.mapred.OutputFormat#getRecordWriter(FileSystem ignored, JobConf job,
+                                     String name, Progressable progress)}
+     * method to obtain a reference to the {@link org.apache.pig.StoreConfig} object. The StoreConfig
+     * object will contain metadata information like schema and location to be used by
+     * that OutputFormat to perform the write() operation
+     * @param conf the JobConf object
+     * @return StoreConfig object containing metadata information useful for
+     * an OutputFormat to write the data
+     * @throws IOException
+     */
+    public static StoreConfig getStoreConfig(JobConf conf) throws IOException {
+        return (StoreConfig) ObjectSerializer.deserialize(conf.get(JobControlCompiler.PIG_STORE_CONFIG));
+    }
+}

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java Wed Apr 29 22:23:55 2009
@@ -24,6 +24,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
 
 /**
@@ -43,7 +44,8 @@
     }
 
     @Override
-    public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+    public StoreFunc createStoreFunc(FileSpec sFile, Schema schema) 
+        throws IOException {
         this.sFile = sFile;
         storer = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
         os = FileLocalizer.create(sFile.getFileName(), pc);

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java Wed Apr 29 22:23:55 2009
@@ -393,4 +393,13 @@
     public boolean equals(Object obj) {
         return true;
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+     */
+    @Override
+    public Class getStorePreparationClass() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java Wed Apr 29 22:23:55 2009
@@ -159,4 +159,13 @@
         // TODO Auto-generated method stub
         
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+     */
+    @Override
+    public Class getStorePreparationClass() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java Wed Apr 29 22:23:55 2009
@@ -41,6 +41,15 @@
 
     public void putNext(Tuple f) throws IOException {
         os.write((f.toString() + recordDelimiter).getBytes());
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+     */
+    @Override
+    public Class getStorePreparationClass() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
     }
 
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java Wed Apr 29 22:23:55 2009
@@ -327,5 +327,14 @@
         return this.fieldDel == other.fieldDel;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+     */
+    @Override
+    public Class getStorePreparationClass() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 
 }

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java Wed Apr 29 22:23:55 2009
@@ -451,7 +451,7 @@
     	planTester.buildPlan("a = load 'input';");
     	LogicalPlan lp = planTester.buildPlan("b = order a by $0;");
     	PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
-    	POStore store = GenPhyOp.topStoreOp();
+    	POStore store = GenPhyOp.dummyPigStorageOp();
     	pp.addAsLeaf(store);
     	MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
     	

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java Wed Apr 29 22:23:55 2009
@@ -288,6 +288,15 @@
             return null;
         }
 
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+         */
+        @Override
+        public Class getStorePreparationClass() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
     }
 
 

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java Wed Apr 29 22:23:55 2009
@@ -313,6 +313,15 @@
             return null;
         }
 
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+         */
+        @Override
+        public Class getStorePreparationClass() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
     }
 
 

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java Wed Apr 29 22:23:55 2009
@@ -26,6 +26,7 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -37,9 +38,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import org.apache.pig.impl.plan.PlanException;
 
@@ -755,6 +753,12 @@
         PORead ret = new PORead(new OperatorKey("", r.nextLong()), bag);
         return ret;
     }
+    
+    public static POStore dummyPigStorageOp() {
+        POStore ret = new POStore(new OperatorKey("", r.nextLong()));
+        ret.setSFile(new FileSpec("DummyFil", new FuncSpec(PigStorage.class.getName() + "()")));
+        return ret;
+    }
 
     public static POStore topStoreOp() {
         POStore ret = new POStore(new OperatorKey("", r.nextLong()));