You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/01/15 00:17:43 UTC
svn commit: r899461 - in /hadoop/pig/branches/load-store-redesign:
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/...
Author: daijy
Date: Thu Jan 14 23:17:42 2010
New Revision: 899461
URL: http://svn.apache.org/viewvc?rev=899461&view=rev
Log:
PIG-1090: Update sources to reflect recent changes in load-store interfaces--PIG-1090-12.patch
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/PigServer.java Thu Jan 14 23:17:42 2010
@@ -524,7 +524,7 @@
}
}
- LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf);
+ LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf, id);
List<ExecJob> jobs = executeCompiledLogicalPlan(storePlan);
if (jobs.size() < 1) {
throw new IOException("Couldn't retrieve job.");
@@ -751,7 +751,7 @@
}
lp = QueryParser.generateStorePlan(scope, lp, "fakefile",
- PigStorage.class.getName(), leaf);
+ PigStorage.class.getName(), leaf, "fake");
}
return lp;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java Thu Jan 14 23:17:42 2010
@@ -18,6 +18,7 @@
package org.apache.pig;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
@@ -25,8 +26,9 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-public class ResourceSchema {
+public class ResourceSchema implements Serializable {
+ private static final long serialVersionUID = 1L;
/* Array Getters intentionally return mutable arrays instead of copies,
* to simplify updates without unnecessary copying.
* Setters make a copy of the arrays in order to prevent an array
@@ -44,7 +46,8 @@
private int version = 0;
- public static class ResourceFieldSchema {
+ public static class ResourceFieldSchema implements Serializable {
+ private static final long serialVersionUID = 1L;
private String name;
// values are constants from DataType
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java Thu Jan 14 23:17:42 2010
@@ -72,9 +72,7 @@
* <b>store A into 'bla'</b>
* then 'bla' is the location. This location should be either a file name
* or a URI. If it does not have a URI scheme Pig will assume it is a
- * filename. This will be
- * called during planning on the front end, not during execution on
- * the backend.
+ * filename. This will be called multiple times during execution on the backend.
* @param location Location indicated in store statement.
* @param job The {@link Job} object
* @throws IOException if the location is not valid.
@@ -83,7 +81,8 @@
/**
* Set the schema for data to be stored. This will be called on the
- * front end during planning. A Store function should implement this function to
+ * front end during planning if the store is associated with a schema.
+ * A Store function should implement this function to
* check that a given schema is acceptable to it. For example, it
* can check that the correct partition keys are included;
* a storage function to be written directly to an OutputFormat can
@@ -110,5 +109,13 @@
* @throws IOException if an exception occurs during the write
*/
void putNext(Tuple t) throws IOException;
-
+
+ /**
+ * This method will be called by Pig both in the front end and back end to
+ * pass a unique signature to the {@link StoreFunc} which it can use to store
+ * information in the {@link UDFContext} which it needs to store between
+ * various method invocations in the front end and back end.
+ * @param signature a unique signature to identify this StoreFunc
+ */
+ public void setStoreFuncUDFContextSignature(String signature);
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Jan 14 23:17:42 2010
@@ -33,24 +33,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.FuncSpec;
-import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreConfig;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
@@ -446,10 +443,18 @@
for (POStore st: mapStores) {
storeLocations.add(st);
+ StoreFunc sFunc = st.getStoreFunc();
+ //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ if (st.getSchema()!=null)
+ sFunc.checkSchema(new ResourceSchema(st.getSchema()));
}
for (POStore st: reduceStores) {
storeLocations.add(st);
+ StoreFunc sFunc = st.getStoreFunc();
+ //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ if (st.getSchema()!=null)
+ sFunc.checkSchema(new ResourceSchema(st.getSchema()));
}
// the OutputFormat we report to Hadoop is always PigOutputFormat
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Thu Jan 14 23:17:42 2010
@@ -18,33 +18,13 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.SortInfo;
-import org.apache.pig.StoreConfig;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.ObjectSerializer;
/**
* This class is used to have a POStore write to DFS via a output
* collector/record writer. It sets up a modified job configuration to
@@ -78,10 +58,7 @@
public StoreFunc createStoreFunc(POStore store)
throws IOException {
- Configuration outputConf = context.getConfiguration();
-
StoreFunc storeFunc = store.getStoreFunc();
- Class outputFormatClass = storeFunc.getOutputFormat().getClass();
// call the setStoreLocation on the storeFunc giving it the
// Job. Typically this will result in the OutputFormat of the
@@ -92,8 +69,7 @@
PigOutputFormat.setLocation(context, store);
OutputFormat outputFormat = null;
try {
- outputFormat = (OutputFormat)ReflectionUtils.newInstance(
- outputFormatClass, outputConf);
+ outputFormat = (OutputFormat)storeFunc.getOutputFormat();
// create a new record writer
writer = outputFormat.getRecordWriter(context);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Thu Jan 14 23:17:42 2010
@@ -51,8 +51,6 @@
public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> {
private enum Mode { SINGLE_STORE, MULTI_STORE};
-
- private OutputCommitter committer;
/** hadoop job output directory */
public static final String MAPRED_OUTPUT_DIR = "mapred.output.dir";
@@ -67,6 +65,8 @@
public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
throws IOException, InterruptedException {
+ // Setup UDFContext so in StoreFunc can make use of it
+ MapRedUtil.setupUDFContext(taskattemptcontext.getConfiguration());
List<POStore> mapStores = getStores(taskattemptcontext,
JobControlCompiler.PIG_MAP_STORES);
List<POStore> reduceStores = getStores(taskattemptcontext,
@@ -193,21 +193,19 @@
@Override
public void checkOutputSpecs(JobContext jobcontext) throws IOException, InterruptedException {
+ // Setup UDFContext so in StoreFunc can make use of it
+ MapRedUtil.setupUDFContext(jobcontext.getConfiguration());
List<POStore> mapStores = getStores(jobcontext,
JobControlCompiler.PIG_MAP_STORES);
checkOutputSpecsHelper(mapStores, jobcontext);
List<POStore> reduceStores = getStores(jobcontext,
JobControlCompiler.PIG_REDUCE_STORES);
checkOutputSpecsHelper(reduceStores, jobcontext);
-
}
private void checkOutputSpecsHelper(List<POStore> stores, JobContext
jobcontext) throws IOException, InterruptedException {
for (POStore store : stores) {
- StoreFunc sFunc = store.getStoreFunc();
- OutputFormat of = sFunc.getOutputFormat();
-
// make a copy of the original JobContext so that
// each OutputFormat get a different copy
JobContext jobContextCopy = new JobContext(
@@ -216,6 +214,9 @@
// set output location
PigOutputFormat.setLocation(jobContextCopy, store);
+ StoreFunc sFunc = store.getStoreFunc();
+ OutputFormat of = sFunc.getOutputFormat();
+
// The above call should have update the conf in the JobContext
// to have the output location - now call checkOutputSpecs()
of.checkOutputSpecs(jobContextCopy);
@@ -237,6 +238,9 @@
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext
taskattemptcontext) throws IOException, InterruptedException {
+ // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
+ // construct of PigOutputCommitter, can make use of it
+ MapRedUtil.setupUDFContext(taskattemptcontext.getConfiguration());
// we return an instance of PigOutputCommitter to Hadoop - this instance
// will wrap the real OutputCommitter(s) belonging to the store(s)
return new PigOutputCommitter(taskattemptcontext);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Jan 14 23:17:42 2010
@@ -1603,6 +1603,7 @@
store.setAlias(loStore.getPlan().getPredecessors(loStore).get(0).getAlias());
store.setSFile(loStore.getOutputFile());
store.setInputSpec(loStore.getInputSpec());
+ store.setSignature(loStore.getSignature());
try {
// create a new schema for ourselves so that when
// we serialize we are not serializing objects that
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Thu Jan 14 23:17:42 2010
@@ -38,7 +38,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Thu Jan 14 23:17:42 2010
@@ -70,6 +70,8 @@
// column names and the asc/dsc info
private SortInfo sortInfo;
+ private String signature;
+
public POStore(OperatorKey k) {
this(k, -1, null);
}
@@ -202,8 +204,9 @@
}
public StoreFunc getStoreFunc() {
- return (StoreFunc)
- PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+ StoreFunc sFunc = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+ sFunc.setStoreFuncUDFContextSignature(signature);
+ return sFunc;
}
/**
@@ -219,4 +222,12 @@
public SortInfo getSortInfo() {
return sortInfo;
}
+
+ public String getSignature() {
+ return signature;
+ }
+
+ public void setSignature(String signature) {
+ this.signature = signature;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Thu Jan 14 23:17:42 2010
@@ -433,4 +433,9 @@
public void setPartitionFilter(Expression plan) throws IOException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java Thu Jan 14 23:17:42 2010
@@ -297,4 +297,8 @@
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
}
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java Thu Jan 14 23:17:42 2010
@@ -46,6 +46,8 @@
// FileSpec is set in PigServer.postProcess. It can be used to
// reload this store, if the optimizer has the need.
private FileSpec mInputSpec;
+
+ private String signature;
transient private StoreFunc mStoreFunc;
private static Log log = LogFactory.getLog(LOStore.class);
@@ -59,7 +61,7 @@
* the file to be stored
*/
public LOStore(LogicalPlan plan, OperatorKey key,
- FileSpec outputFileSpec) throws IOException {
+ FileSpec outputFileSpec, String alias) throws IOException {
super(plan, key);
mOutputFile = outputFileSpec;
@@ -70,13 +72,28 @@
// Also remove the commented out import org.apache.pig.impl.PigContext
try {
- mStoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
+ mStoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
+ this.mAlias = alias;
+ this.signature = constructSignature(mAlias, mOutputFile.getFileName(), mOutputFile.getFuncSpec().getCtorArgs());
+ mStoreFunc.setStoreFuncUDFContextSignature(this.signature);
} catch (Exception e) {
IOException ioe = new IOException(e.getMessage());
ioe.setStackTrace(e.getStackTrace());
throw ioe;
}
}
+
+ private String constructSignature(String alias, String filename, String[] args) {
+ String s = alias+"_"+filename+"_";
+ if (args!=null) {
+ for (int i=0;i<args.length;i++) {
+ s = s+args[i];
+ if (i!=args.length-1)
+ s = s+"_";
+ }
+ }
+ return s;
+ }
public FileSpec getOutputFile() {
return mOutputFile;
@@ -198,5 +215,16 @@
result.add(new RequiredFields(true));
return result;
}
-
+
+ @Override
+ public void setAlias(String newAlias) {
+ super.setAlias(newAlias);
+ mStoreFunc.setStoreFuncUDFContextSignature(getAlias()+"_"+mOutputFile.getFileName());
+ signature = constructSignature(mAlias, mOutputFile.getFileName(), mOutputFile.getFuncSpec().getCtorArgs());
+ signature = getAlias()+"_"+mOutputFile.getFileName();
+ }
+
+ public String getSignature() {
+ return signature;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Jan 14 23:17:42 2010
@@ -136,7 +136,8 @@
LogicalPlan readFrom,
String fileName,
String func,
- LogicalOperator input) throws FrontendException {
+ LogicalOperator input,
+ String alias) throws FrontendException {
if (func == null) {
func = PigStorage.class.getName();
@@ -152,7 +153,8 @@
try {
store = new LOStore(storePlan,
new OperatorKey(scope, storeNodeId),
- new FileSpec(fileName, new FuncSpec(func)));
+ new FileSpec(fileName, new FuncSpec(func)),
+ alias);
} catch (IOException ioe) {
throw new FrontendException(ioe.getMessage(), ioe);
}
@@ -2417,7 +2419,7 @@
fileNameMap.put(fileName, absolutePath);
}
LogicalOperator store = new LOStore(lp, new OperatorKey(scope, getNextId()),
- new FileSpec(absolutePath, funcSpec));
+ new FileSpec(absolutePath, funcSpec), t.image);
LogicalOperator input = mapAliasOp.get(t.image);
if (input == null)
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/LogUtils.java Thu Jan 14 23:17:42 2010
@@ -95,7 +95,10 @@
PigException pigException = LogUtils.getPigException(t);
if(pigException != null) {
- message = "ERROR " + pigException.getErrorCode() + ": " + pigException.getMessage();
+ if (pigException.getCause()!=null)
+ message = "ERROR " + pigException.getErrorCode() + ": " + pigException.getCause().getMessage();
+ else
+ message = "ERROR " + pigException.getErrorCode() + ": " + pigException.getMessage();
} else {
if((t instanceof ParseException
|| t instanceof org.apache.pig.tools.pigscript.parser.TokenMgrError
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestInputOutputFileValidator.java Thu Jan 14 23:17:42 2010
@@ -151,7 +151,7 @@
new FileSpec(outputFile, new FuncSpec("org.apache.pig.builtin.PigStorage"));
LOLoad load = new LOLoad(plan, genNewOperatorKeyId(), filespec1,
ConfigurationUtil.toConfiguration(dfs.getConfiguration())) ;
- LOStore store = new LOStore(plan, genNewOperatorKeyId(), filespec2) ;
+ LOStore store = new LOStore(plan, genNewOperatorKeyId(), filespec2, "new") ;
plan.add(load) ;
plan.add(store) ;
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=899461&r1=899460&r2=899461&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Thu Jan 14 23:17:42 2010
@@ -2699,6 +2699,10 @@
// TODO Auto-generated method stub
}
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ }
}