You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/04/30 00:23:56 UTC
svn commit: r769970 - in /hadoop/pig/branches/multiquery: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/execut...
Author: gates
Date: Wed Apr 29 22:23:55 2009
New Revision: 769970
URL: http://svn.apache.org/viewvc?rev=769970&view=rev
Log:
PIG-652 Adapt changes in store interface to multi-query changes.
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
Modified:
hadoop/pig/branches/multiquery/CHANGES.txt
hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Wed Apr 29 22:23:55 2009
@@ -615,3 +615,6 @@
PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
PIG-627: multiquery support M3 (rding via gates)
+
+ PIG-652: Adapt changes in store interface to multi-query changes (hagleitn
+ via gates).
Added: hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java?rev=769970&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java Wed Apr 29 22:23:55 2009
@@ -0,0 +1,67 @@
+/**
+ *
+ */
+package org.apache.pig;
+
+import java.io.Serializable;
+
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * A Class which will encapsulate metadata information that a
+ * OutputFormat (or possibly StoreFunc) may want to know
+ * about the data that needs to be stored.
+ */
+public class StoreConfig implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private String location;
+ private Schema schema;
+
+
+ /**
+ * @param location
+ * @param schema
+ */
+ public StoreConfig(String location, Schema schema) {
+ this.location = location;
+ this.schema = schema;
+ }
+
+ /**
+ * @return the location
+ */
+ public String getLocation() {
+ return location;
+ }
+ /**
+ * @param location the location to set
+ */
+ public void setLocation(String location) {
+ this.location = location;
+ }
+ /**
+ * @return the schema
+ */
+ public Schema getSchema() {
+ return schema;
+ }
+ /**
+ * @param schema the schema to set
+ */
+ public void setSchema(Schema schema) {
+ this.schema = schema;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "{location:" + location + ", schema:" + schema + "}";
+ }
+
+}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/StoreFunc.java Wed Apr 29 22:23:55 2009
@@ -58,7 +58,24 @@
*
* @throws IOException
*/
- public abstract void finish() throws IOException;
+ public abstract void finish() throws IOException;
+
+ /**
+ * Specify a backend specific class to use to prepare for
+ * storing output. In the Hadoop case, this can return an
+ * OutputFormat that will be used instead of PigOutputFormat. The
+ * framework will call this function and if a Class is returned
+ * that implements OutputFormat it will be used. For more details on how
+ * the OutputFormat should interact with Pig, see
+ * {@link org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, String, org.apache.hadoop.util.Progressable)}
+ * @return Backend specific class used to prepare for storing output.
+ * If the {@link StoreFunc} implementation does not have a class to prepare
+ * for storing output, it can return null and a default Pig implementation
+ * will be used to prepare for storing output.
+ * @throws IOException if the class does not implement the expected
+ * interface(s).
+ */
+ public Class getStorePreparationClass() throws IOException;
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Apr 29 22:23:55 2009
@@ -36,12 +36,15 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
@@ -100,6 +103,8 @@
PigContext pigContext;
private final Log log = LogFactory.getLog(getClass());
+
+ public static final String PIG_STORE_CONFIG = "pig.store.config";
public static final String LOG_DIR = "_logs";
@@ -310,7 +315,6 @@
"pig.streaming.cache.files", false);
jobConf.setInputFormat(PigInputFormat.class);
- jobConf.setOutputFormat(PigOutputFormat.class);
//Process POStore and remove it from the plan
List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
@@ -328,11 +332,37 @@
st = reduceStores.remove(0);
mro.reducePlan.remove(st);
}
+
+ // If the StoreFunc associate with the POStore is implements
+ // getStorePreparationClass() and returns a non null value,
+ // then it could be wanting to implement OutputFormat for writing out to hadoop
+ // Check if this is the case, if so, use the OutputFormat class the
+ // StoreFunc gives us else use our default PigOutputFormat
+ Object storeFunc = PigContext.instantiateFuncFromSpec(st.getSFile().getFuncSpec());
+ Class sPrepClass = null;
+ try {
+ sPrepClass = ((StoreFunc)storeFunc).getStorePreparationClass();
+ } catch(AbstractMethodError e) {
+ // this is for backward compatibility wherein some old StoreFunc
+ // which does not implement getStorePreparationClass() is being
+ // used. In this case, we want to just use PigOutputFormat
+ sPrepClass = null;
+ }
+ if(sPrepClass != null && OutputFormat.class.isAssignableFrom(sPrepClass)) {
+ jobConf.setOutputFormat(sPrepClass);
+ } else {
+ jobConf.setOutputFormat(PigOutputFormat.class);
+ }
+
+ //set out filespecs
String outputPath = st.getSFile().getFileName();
FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+
jobConf.set("pig.storeFunc", outputFuncSpec.toString());
-
+ jobConf.set(PIG_STORE_CONFIG,
+ ObjectSerializer.serialize(new StoreConfig(outputPath, st.getSchema())));
+
jobConf.set("pig.streaming.log.dir",
new Path(outputPath, LOG_DIR).toString());
jobConf.set("pig.streaming.task.output.dir", outputPath);
@@ -349,6 +379,7 @@
fs.mkdirs(tmpOut);
}
+ jobConf.setOutputFormat(PigOutputFormat.class);
FileOutputFormat.setOutputPath(jobConf, curTmpPath);
jobConf.set("pig.streaming.log.dir",
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Wed Apr 29 22:23:55 2009
@@ -36,10 +36,13 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.StoreConfig;
import org.apache.pig.StoreFunc;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
@@ -65,6 +68,7 @@
private JobConf job;
private final Log log = LogFactory.getLog(getClass());
+ public static final String PIG_STORE_CONFIG = "pig.store.config";
public MapReducePOStoreImpl(JobConf job) {
this.job = job;
@@ -75,14 +79,33 @@
}
@Override
- public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+ public StoreFunc createStoreFunc(FileSpec sFile, Schema schema)
+ throws IOException {
// set up a new job conf
JobConf outputConf = new JobConf(job);
String tmpPath = PlanHelper.makeStoreTmpPath(sFile.getFileName());
- // Right now we're always using PigOutputFormat.
- outputConf.setOutputFormat(PigOutputFormat.class);
+ // If the StoreFunc associate with the POStore is implements
+ // getStorePreparationClass() and returns a non null value,
+ // then it could be wanting to implement OutputFormat for writing out to hadoop
+ // Check if this is the case, if so, use the OutputFormat class the
+ // StoreFunc gives us else use our default PigOutputFormat
+ Object storeFunc = PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+ Class sPrepClass = null;
+ try {
+ sPrepClass = ((StoreFunc)storeFunc).getStorePreparationClass();
+ } catch(AbstractMethodError e) {
+ // this is for backward compatibility wherein some old StoreFunc
+ // which does not implement getStorePreparationClass() is being
+ // used. In this case, we want to just use PigOutputFormat
+ sPrepClass = null;
+ }
+ if(sPrepClass != null && OutputFormat.class.isAssignableFrom(sPrepClass)) {
+ outputConf.setOutputFormat(sPrepClass);
+ } else {
+ outputConf.setOutputFormat(PigOutputFormat.class);
+ }
// PigOuputFormat will look for pig.storeFunc to actually
// write stuff out.
@@ -94,6 +117,10 @@
Path outputDir = new Path(sFile.getFileName()).makeQualified(FileSystem.get(outputConf));
outputConf.set("mapred.output.dir", outputDir.toString());
+ // Set the schema
+ outputConf.set(PIG_STORE_CONFIG,
+ ObjectSerializer.serialize(new StoreConfig(outputDir.toString(), schema)));
+
// The workpath is set to a unique-per-store subdirectory of
// the current working directory.
String workPath = outputConf.get("mapred.work.output.dir");
@@ -168,5 +195,10 @@
@Override
public void finish() throws IOException {
}
+
+ @Override
+ public Class getStorePreparationClass() throws IOException {
+ return null;
+ }
}
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Wed Apr 29 22:23:55 2009
@@ -33,6 +33,7 @@
import org.apache.pig.PigException;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -47,6 +48,24 @@
public class PigOutputFormat implements OutputFormat<WritableComparable, Tuple> {
public static final String PIG_OUTPUT_FUNC = "pig.output.func";
+ /**
+ * In general, the mechanism for an OutputFormat in Pig to get hold of the storeFunc
+ * and the metadata information (for now schema and location provided for the store in
+ * the pig script) is through the following Utility static methods:
+ * {@link org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil#getStoreFunc(JobConf)}
+ * - this will get the {@link org.apache.pig.StoreFunc} reference to use in the RecordWriter.write()
+ * {@link MapRedUtil#getStoreConfig(JobConf)} - this will get the {@link org.apache.pig.StoreConfig}
+ * reference which has metadata like the location (the string supplied with store statement in the script)
+ * and the {@link org.apache.pig.impl.logicalLayer.schema.Schema} of the data. The OutputFormat
+ * should NOT use the location in the StoreConfig to write the output if the location represents a
+ * Hadoop dfs path. This is because when "speculative execution" is turned on in Hadoop, multiple
+ * attempts for the same task (for a given partition) may be running at the same time. So using the
+ * location will mean that these different attempts will over-write each other's output.
+ * The OutputFormat should use
+ * {@link org.apache.hadoop.mapred.FileOutputFormat#getWorkOutputPath(JobConf)}
+ * which will provide a safe output directory into which the OutputFormat should write
+ * the part file (given by the name argument in the getRecordWriter() call).
+ */
public RecordWriter<WritableComparable, Tuple> getRecordWriter(FileSystem fs, JobConf job,
String name, Progressable progress) throws IOException {
Path outputDir = FileOutputFormat.getWorkOutputPath(job);
@@ -56,20 +75,7 @@
public PigRecordWriter getRecordWriter(FileSystem fs, JobConf job,
Path outputDir, String name, Progressable progress)
throws IOException {
- StoreFunc store;
- String storeFunc = job.get("pig.storeFunc", "");
- if (storeFunc.length() == 0) {
- store = new PigStorage();
- } else {
- try {
- store = (StoreFunc) PigContext
- .instantiateFuncFromSpec(storeFunc);
- } catch (Exception e) {
- int errCode = 2081;
- String msg = "Unable to setup the store function.";
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
+ StoreFunc store = MapRedUtil.getStoreFunc(job);
String parentName = FileOutputFormat.getOutputPath(job).getName();
int suffixStart = parentName.lastIndexOf('.');
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Apr 29 22:23:55 2009
@@ -46,6 +46,7 @@
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DependencyOrderWalkerWOSeenChk;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -1197,6 +1198,20 @@
.getNextNodeId(scope)));
store.setSFile(loStore.getOutputFile());
store.setInputSpec(loStore.getInputSpec());
+ try {
+ // create a new schema for ourselves so that when
+ // we serialize we are not serializing objects that
+ // contain the schema - apparently Java tries to
+ // serialize the object containing the schema if
+ // we are trying to serialize the schema reference in
+ // the containing object. The schema here will be serialized
+ // in JobControlCompiler
+ store.setSchema(new Schema(loStore.getSchema()));
+ } catch (FrontendException e1) {
+ int errorCode = 1060;
+ String message = "Cannot resolve Store output schema";
+ throw new VisitorException(message, errorCode, PigException.BUG, e1);
+ }
currentPlan.add(store);
List<LogicalOperator> op = loStore.getPlan().getPredecessors(loStore);
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Wed Apr 29 22:23:55 2009
@@ -30,6 +30,7 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -53,6 +54,7 @@
private final Log log = LogFactory.getLog(getClass());
private POStoreImpl impl;
private FileSpec sFile;
+ private Schema schema;
// flag to distinguish user stores from MRCompiler stores.
private boolean isTmpStore;
@@ -81,7 +83,7 @@
public void setUp() throws IOException{
if (impl != null) {
try{
- storer = impl.createStoreFunc(sFile);
+ storer = impl.createStoreFunc(sFile, schema);
}catch (IOException ioe) {
int errCode = 2081;
String msg = "Unable to setup the store function.";
@@ -184,4 +186,12 @@
public void setStoreImpl(POStoreImpl impl) {
this.impl = impl;
}
+
+ public void setSchema(Schema schema) {
+ this.schema = schema;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java Wed Apr 29 22:23:55 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.pig.StoreFunc;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
/**
* This class is used to specify the actual behavior of the store
@@ -31,7 +32,8 @@
* @param sFile - The file the store should write to
* @throws IOException
*/
- public abstract StoreFunc createStoreFunc(FileSpec sFile) throws IOException;
+ public abstract StoreFunc createStoreFunc(FileSpec sFile, Schema schema)
+ throws IOException;
/**
* At the end of processing, the outputstream is closed
Added: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=769970&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Wed Apr 29 22:23:55 2009
@@ -0,0 +1,68 @@
+/**
+ *
+ */
+package org.apache.pig.backend.hadoop.executionengine.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Progressable;
+import org.apache.pig.PigException;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/**
+ * A class of utility static methods to be used in the hadoop map reduce backend
+ */
+public class MapRedUtil {
+
+ /**
+ * This method is to be called from an
+ * {@link org.apache.hadoop.mapred.OutputFormat#getRecordWriter(FileSystem ignored, JobConf job,
+ String name, Progressable progress)}
+ * method to obtain a reference to the {@link org.apache.pig.StoreFunc} object to be used by
+ * that OutputFormat to perform the write() operation
+ * @param conf the JobConf object
+ * @return the StoreFunc reference
+ * @throws ExecException
+ */
+ public static StoreFunc getStoreFunc(JobConf conf) throws ExecException {
+ StoreFunc store;
+ String storeFunc = conf.get("pig.storeFunc", "");
+ if (storeFunc.length() == 0) {
+ store = new PigStorage();
+ } else {
+ try {
+ store = (StoreFunc) PigContext
+ .instantiateFuncFromSpec(storeFunc);
+ } catch (Exception e) {
+ int errCode = 2081;
+ String msg = "Unable to setup the store function.";
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ return store;
+ }
+
+ /**
+ * This method is to be called from an
+ * {@link org.apache.hadoop.mapred.OutputFormat#getRecordWriter(FileSystem ignored, JobConf job,
+ String name, Progressable progress)}
+ * method to obtain a reference to the {@link org.apache.pig.StoreConfig} object. The StoreConfig
+ * object will contain metadata information like schema and location to be used by
+ * that OutputFormat to perform the write() operation
+ * @param conf the JobConf object
+ * @return StoreConfig object containing metadata information useful for
+ * an OutputFormat to write the data
+ * @throws IOException
+ */
+ public static StoreConfig getStoreConfig(JobConf conf) throws IOException {
+ return (StoreConfig) ObjectSerializer.deserialize(conf.get(JobControlCompiler.PIG_STORE_CONFIG));
+ }
+}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java Wed Apr 29 22:23:55 2009
@@ -24,6 +24,7 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
/**
@@ -43,7 +44,8 @@
}
@Override
- public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+ public StoreFunc createStoreFunc(FileSpec sFile, Schema schema)
+ throws IOException {
this.sFile = sFile;
storer = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
os = FileLocalizer.create(sFile.getFileName(), pc);
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java Wed Apr 29 22:23:55 2009
@@ -393,4 +393,13 @@
public boolean equals(Object obj) {
return true;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+ */
+ @Override
+ public Class getStorePreparationClass() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinaryStorage.java Wed Apr 29 22:23:55 2009
@@ -159,4 +159,13 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+ */
+ @Override
+ public Class getStorePreparationClass() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigDump.java Wed Apr 29 22:23:55 2009
@@ -41,6 +41,15 @@
public void putNext(Tuple f) throws IOException {
os.write((f.toString() + recordDelimiter).getBytes());
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+ */
+ @Override
+ public Class getStorePreparationClass() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
}
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/PigStorage.java Wed Apr 29 22:23:55 2009
@@ -327,5 +327,14 @@
return this.fieldDel == other.fieldDel;
}
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+ */
+ @Override
+ public Class getStorePreparationClass() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java Wed Apr 29 22:23:55 2009
@@ -451,7 +451,7 @@
planTester.buildPlan("a = load 'input';");
LogicalPlan lp = planTester.buildPlan("b = order a by $0;");
PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
- POStore store = GenPhyOp.topStoreOp();
+ POStore store = GenPhyOp.dummyPigStorageOp();
pp.addAsLeaf(store);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocal.java Wed Apr 29 22:23:55 2009
@@ -288,6 +288,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+ */
+ @Override
+ public Class getStorePreparationClass() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMapReduce.java Wed Apr 29 22:23:55 2009
@@ -313,6 +313,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+ */
+ @Override
+ public Class getStorePreparationClass() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java?rev=769970&r1=769969&r2=769970&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java Wed Apr 29 22:23:55 2009
@@ -26,6 +26,7 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -37,9 +38,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.impl.plan.PlanException;
@@ -755,6 +753,12 @@
PORead ret = new PORead(new OperatorKey("", r.nextLong()), bag);
return ret;
}
+
+ public static POStore dummyPigStorageOp() {
+ POStore ret = new POStore(new OperatorKey("", r.nextLong()));
+ ret.setSFile(new FileSpec("DummyFil", new FuncSpec(PigStorage.class.getName() + "()")));
+ return ret;
+ }
public static POStore topStoreOp() {
POStore ret = new POStore(new OperatorKey("", r.nextLong()));