You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/03/02 02:01:52 UTC
svn commit: r917827 [1/2] - in /hadoop/pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/zebra/src/java/org/apache/hadoop/zebra/pig/
contrib/zebra/src/test/org/apache/hadoop/zebra/pig/ src/org/apache/pig/
src/...
Author: pradeepkth
Date: Tue Mar 2 01:01:51 2010
New Revision: 917827
URL: http://svn.apache.org/viewvc?rev=917827&view=rev
Log:
PIG-1265: Change LoadMetadata and StoreMetadata to use Job instead of Configuraiton and add a cleanupOnFailure method to StoreFuncInterface (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java
hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Mar 2 01:01:51 2010
@@ -22,6 +22,10 @@
INCOMPATIBLE CHANGES
+PIG-1265: Change LoadMetadata and StoreMetadata to use Job instead of
+Configuraiton and add a cleanupOnFailure method to StoreFuncInterface
+(pradeepkth)
+
PIG-1250: Make StoreFunc an abstract class and create a mirror interface
called StoreFuncInterface (pradeepkth)
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java Tue Mar 2 01:01:51 2010
@@ -27,6 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.Expression;
import org.apache.pig.LoadMetadata;
import org.apache.pig.StoreMetadata;
@@ -131,7 +132,7 @@
// Implementation of LoadMetaData interface
@Override
- public String[] getPartitionKeys(String location, Configuration conf) {
+ public String[] getPartitionKeys(String location, Job job) {
return null;
}
@@ -147,7 +148,8 @@
* TODO location and conf params are ignored in favor of initialzation data
*/
@Override
- public ResourceSchema getSchema(String location, Configuration conf) throws IOException {
+ public ResourceSchema getSchema(String location, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
Set<ElementDescriptor> schemaFileSet = null;
try {
schemaFileSet = findMetaFile(location, schemaFileName, conf);
@@ -188,7 +190,8 @@
* @see org.apache.pig.LoadMetadata#getStatistics(String, Configuration)
*/
@Override
- public ResourceStatistics getStatistics(String location, Configuration conf) throws IOException {
+ public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
Set<ElementDescriptor> statFileSet = null;
try {
statFileSet = findMetaFile(location, statFileName, conf);
@@ -224,7 +227,8 @@
// Implementation of StoreMetaData interface
@Override
- public void storeStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException {
+ public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
ElementDescriptor statFilePath = storage.asElement(location, statFileName);
if(!statFilePath.exists() && stats != null) {
@@ -241,7 +245,8 @@
}
@Override
- public void storeSchema(ResourceSchema schema, String location, Configuration conf) throws IOException {
+ public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
ElementDescriptor schemaFilePath = storage.asElement(location, schemaFileName);
if(!schemaFilePath.exists() && schema != null) {
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java Tue Mar 2 01:01:51 2010
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.Expression;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
@@ -55,13 +56,13 @@
@Override
public ResourceSchema getSchema(String location,
- Configuration conf) throws IOException {
- return (new JsonMetadata()).getSchema(location, conf);
+ Job job) throws IOException {
+ return (new JsonMetadata()).getSchema(location, job);
}
@Override
public ResourceStatistics getStatistics(String location,
- Configuration conf) throws IOException {
+ Job job) throws IOException {
return null;
}
@@ -71,7 +72,7 @@
}
@Override
- public String[] getPartitionKeys(String location, Configuration conf)
+ public String[] getPartitionKeys(String location, Job job)
throws IOException {
return null;
}
@@ -81,18 +82,18 @@
@Override
public void storeSchema(ResourceSchema schema, String location,
- Configuration conf) throws IOException {
+ Job job) throws IOException {
JsonMetadata metadataWriter = new JsonMetadata();
byte fieldDel = '\t';
byte recordDel = '\n';
metadataWriter.setFieldDel(fieldDel);
metadataWriter.setRecordDel(recordDel);
- metadataWriter.storeSchema(schema, location, conf);
+ metadataWriter.storeSchema(schema, location, job);
}
@Override
public void storeStatistics(ResourceStatistics stats, String location,
- Configuration conf) throws IOException {
+ Job job) throws IOException {
}
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Tue Mar 2 01:01:51 2010
@@ -280,15 +280,14 @@
}
@Override
- public String[] getPartitionKeys(String location, Configuration conf)
+ public String[] getPartitionKeys(String location, Job job)
throws IOException {
return null;
}
@Override
- public ResourceSchema getSchema(String location, Configuration conf) throws IOException {
- Path[] paths = getPathsFromLocation( location, conf );
- Job job = new Job(conf);
+ public ResourceSchema getSchema(String location, Job job) throws IOException {
+ Path[] paths = getPathsFromLocation( location, job.getConfiguration());
TableInputFormat.setInputPaths( job, paths );
Schema tableSchema = null;
@@ -325,7 +324,7 @@
}
@Override
- public ResourceStatistics getStatistics(String location, Configuration conf)
+ public ResourceStatistics getStatistics(String location, Job job)
throws IOException {
// Statistics is not supported.
return null;
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Tue Mar 2 01:01:51 2010
@@ -163,10 +163,11 @@
}
@Override
- public void storeSchema(ResourceSchema schema, String location, Configuration conf)
+ public void storeSchema(ResourceSchema schema, String location, Job job)
throws IOException {
// no-op. We do close at cleanupJob().
- BasicTable.Writer write = new BasicTable.Writer( new Path( location ), conf );
+ BasicTable.Writer write = new BasicTable.Writer( new Path( location ),
+ job.getConfiguration());
write.close();
}
@@ -177,7 +178,7 @@
@Override
public void storeStatistics(ResourceStatistics stats, String location,
- Configuration conf) throws IOException {
+ Job job) throws IOException {
// no-op
}
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java Tue Mar 2 01:01:51 2010
@@ -437,15 +437,19 @@
System.out.println(RowValue);
}
- Path newPath = new Path(getCurrentMethodName());
- ExecJob pigJob = pigServer
- .store(
- "records",
- new Path(newPath, "store").toString(),
- TableStorer.class.getCanonicalName()
- + "('[s7, s2]; [s3, s4]')");
- Assert.assertNotNull(pigJob.getException());
- System.out.println(pigJob.getException());
+ try {
+ Path newPath = new Path(getCurrentMethodName());
+ ExecJob pigJob = pigServer
+ .store(
+ "records",
+ new Path(newPath, "store").toString(),
+ TableStorer.class.getCanonicalName()
+ + "('[s7, s2]; [s3, s4]')");
+ } catch (Exception e) {
+ System.out.println(e);
+ return;
+ }
+ Assert.fail("Exception expected");
}
@Test
@@ -462,16 +466,20 @@
System.out.println(RowValue);
}
- Path newPath = new Path(getCurrentMethodName());
-
- ExecJob pigJob = pigServer
- .store(
- "records",
- new Path(newPath, "store").toString(),
- TableStorer.class.getCanonicalName()
- + "('[s1, s2]; [s1, s4]')");
- Assert.assertNotNull(pigJob.getException());
- System.out.println(pigJob.getException());
+ try {
+ Path newPath = new Path(getCurrentMethodName());
+
+ ExecJob pigJob = pigServer
+ .store(
+ "records",
+ new Path(newPath, "store").toString(),
+ TableStorer.class.getCanonicalName()
+ + "('[s1, s2]; [s1, s4]')");
+ } catch(Exception e) {
+ System.out.println(e);
+ return;
+ }
+ Assert.fail("Exception expected");
}
@Test
@@ -487,17 +495,21 @@
Tuple RowValue = it.next();
System.out.println(RowValue);
}
+ try{
+ Path newPath = new Path(getCurrentMethodName());
+
+ ExecJob pigJob = pigServer
+ .store(
+ "records",
+ new Path(newPath, "store").toString(),
+ TableStorer.class.getCanonicalName()
+ + "('[s1]; [s1]')");
+ } catch(Exception e) {
+ System.out.println(e);
+ return;
+ }
+ Assert.fail("Exception expected");
- Path newPath = new Path(getCurrentMethodName());
-
- ExecJob pigJob = pigServer
- .store(
- "records",
- new Path(newPath, "store").toString(),
- TableStorer.class.getCanonicalName()
- + "('[s1]; [s1]')");
- Assert.assertNotNull(pigJob.getException());
- System.out.println(pigJob.getException());
}
// @Test
@@ -541,13 +553,17 @@
// Use pig STORE to store testing data
//
System.out.println("path = " + path);
- ExecJob pigJob = pigServer
- .store(
- "records",
- path.toString(),
- TableStorer.class.getCanonicalName()
- + "('[s1, s2]; [s3, s4]')");
- Assert.assertNotNull(pigJob.getException());
- System.out.println("pig job exception : " + pigJob.getException());
+ try {
+ ExecJob pigJob = pigServer
+ .store(
+ "records",
+ path.toString(),
+ TableStorer.class.getCanonicalName()
+ + "('[s1, s2]; [s3, s4]')");
+ } catch(Exception e) {
+ System.out.println(e);
+ return;
+ }
+ Assert.fail("Exception expected");
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/LoadMetadata.java Tue Mar 2 01:01:51 2010
@@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
/**
* This interface defines how to retrieve metadata related to data to be loaded.
@@ -32,14 +33,16 @@
* Get a schema for the data to be loaded.
* @param location Location as returned by
* {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
- * @param conf The {@link Configuration} object
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
* @return schema for the data to be loaded. This schema should represent
* all tuples of the returned data. If the schema is unknown or it is
* not possible to return a schema that represents all returned data,
* then null should be returned.
* @throws IOException if an exception occurs while determining the schema
*/
- ResourceSchema getSchema(String location, Configuration conf) throws
+ ResourceSchema getSchema(String location, Job job) throws
IOException;
/**
@@ -47,31 +50,35 @@
* available, then null should be returned.
* @param location Location as returned by
* {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
- * @param conf The {@link Configuration} object
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
* @return statistics about the data to be loaded. If no statistics are
* available, then null should be returned.
* @throws IOException if an exception occurs while retrieving statistics
*/
- ResourceStatistics getStatistics(String location, Configuration conf)
+ ResourceStatistics getStatistics(String location, Job job)
throws IOException;
/**
* Find what columns are partition keys for this input.
* @param location Location as returned by
* {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
- * @param conf The {@link Configuration} object
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
* @return array of field names of the partition keys. Implementations
* should return null to indicate that there are no partition keys
* @throws IOException if an exception occurs while retrieving partition keys
*/
- String[] getPartitionKeys(String location, Configuration conf)
+ String[] getPartitionKeys(String location, Job job)
throws IOException;
/**
* Set the filter for partitioning. It is assumed that this filter
* will only contain references to fields given as partition keys in
* getPartitionKeys. So if the implementation returns null in
- * {@link #getPartitionKeys(String, Configuration)}, then this method is not
+ * {@link #getPartitionKeys(String, Job)}, then this method is not
* called by pig runtime. This method is also not called by the pig runtime
* if there are no partition filter conditions.
* @param partitionFilter that describes filter for partitioning
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue Mar 2 01:01:51 2010
@@ -36,6 +36,7 @@
import java.util.Set;
import java.util.Stack;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.pig.impl.plan.PlanException;
@@ -70,7 +71,9 @@
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -539,7 +542,7 @@
}
try {
- LogicalPlan lp = compileLp(id);
+ LogicalPlan lp = clonePlan(id);
// MRCompiler needs a store to be the leaf - hence
// add a store to the plan to explain
@@ -557,7 +560,8 @@
}
}
- LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf, leaf.getAlias(), pigContext);
+ LogicalPlan unCompiledstorePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf, leaf.getAlias(), pigContext);
+ LogicalPlan storePlan = compileLp(unCompiledstorePlan, true);
List<ExecJob> jobs = executeCompiledLogicalPlan(storePlan);
if (jobs.size() < 1) {
throw new IOException("Couldn't retrieve job.");
@@ -813,8 +817,13 @@
List<ExecJob> execJobs = pigContext.getExecutionEngine().execute(pp, "job_pigexec_");
for (ExecJob execJob: execJobs) {
if (execJob.getStatus()==ExecJob.JOB_STATUS.FAILED) {
- FileLocalizer.triggerDeleteOnFail();
- break;
+ POStore store = execJob.getPOStore();
+ try {
+ store.getStoreFunc().cleanupOnFailure(store.getSFile().getFileName(),
+ new Job(ConfigurationUtil.toConfiguration(execJob.getConfiguration())));
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
}
}
return execJobs;
@@ -841,39 +850,21 @@
String msg = "Unable to clone plan before compiling";
throw new FrontendException(msg, errCode, PigException.BUG, e);
}
-
+ return compileLp(lpClone, optimize);
+ }
+
+ @SuppressWarnings("unchecked")
+ private LogicalPlan compileLp(LogicalPlan lp, boolean optimize) throws
+ FrontendException {
// Set the logical plan values correctly in all the operators
- PlanSetter ps = new PlanSetter(lpClone);
+ PlanSetter ps = new PlanSetter(lp);
ps.visit();
- SortInfoSetter sortInfoSetter = new SortInfoSetter(lpClone);
- sortInfoSetter.visit();
-
// run through validator
CompilationMessageCollector collector = new CompilationMessageCollector() ;
- FrontendException caught = null;
- try {
- LogicalPlanValidationExecutor validator =
- new LogicalPlanValidationExecutor(lpClone, pigContext);
- validator.validate(lpClone, collector);
- } catch (FrontendException fe) {
- // Need to go through and see what the collector has in it. But
- // remember what we've caught so we can wrap it into what we
- // throw.
- caught = fe;
- }
-
- if(aggregateWarning) {
- CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
- } else {
- for(Enum type: MessageType.values()) {
- CompilationMessageCollector.logAllMessages(collector, log);
- }
- }
+ boolean isBeforeOptimizer = true;
+ validate(lp, collector, isBeforeOptimizer);
- if (caught != null) {
- throw caught;
- }
// optimize
if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("false")) {
@@ -888,11 +879,19 @@
throw new FrontendException(msg, errCode, PigException.BUG, ioe);
}
- LogicalOptimizer optimizer = new LogicalOptimizer(lpClone, pigContext.getExecType(), optimizerRules);
+ LogicalOptimizer optimizer = new LogicalOptimizer(lp, pigContext.getExecType(), optimizerRules);
optimizer.optimize();
}
- return lpClone;
+ // compute whether output data is sorted or not
+ SortInfoSetter sortInfoSetter = new SortInfoSetter(lp);
+ sortInfoSetter.visit();
+
+ // run validations to be done after optimization
+ isBeforeOptimizer = false;
+ validate(lp, collector, isBeforeOptimizer);
+
+ return lp;
}
private PhysicalPlan compilePp(LogicalPlan lp) throws ExecException {
@@ -904,6 +903,32 @@
return pp;
}
+ private void validate(LogicalPlan lp, CompilationMessageCollector collector,
+ boolean isBeforeOptimizer) throws FrontendException {
+ FrontendException caught = null;
+ try {
+ LogicalPlanValidationExecutor validator =
+ new LogicalPlanValidationExecutor(lp, pigContext, isBeforeOptimizer);
+ validator.validate(lp, collector);
+ } catch (FrontendException fe) {
+ // Need to go through and see what the collector has in it. But
+ // remember what we've caught so we can wrap it into what we
+ // throw.
+ caught = fe;
+ }
+
+ if(aggregateWarning) {
+ CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
+ } else {
+ for(Enum type: MessageType.values()) {
+ CompilationMessageCollector.logAllMessages(collector, log);
+ }
+ }
+
+ if (caught != null) {
+ throw caught;
+ }
+ }
private LogicalPlan getPlanFromAlias(
String alias,
String operation) throws FrontendException {
@@ -1128,6 +1153,7 @@
}
}
+ @Override
protected Graph clone() {
// There are two choices on how we clone the logical plan
// 1 - we really clone each operator and connect up the cloned operators
Modified: hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java Tue Mar 2 01:01:51 2010
@@ -19,12 +19,12 @@
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
@@ -54,6 +54,7 @@
* @throws IOException
* @throws IOException if the conversion is not possible
*/
+ @Override
public String relToAbsPathForStoreLocation(String location, Path curDir)
throws IOException {
return LoadFunc.getAbsolutePath(location, curDir);
@@ -71,12 +72,9 @@
public abstract OutputFormat getOutputFormat() throws IOException;
/**
- * Communicate to the store function the location used in Pig Latin to refer
- * to the object(s) being stored. That is, if the PL script is
- * <b>store A into 'bla'</b>
- * then 'bla' is the location. This location should be either a file name
- * or a URI. If it does not have a URI scheme Pig will assume it is a
- * filename.
+ * Communicate to the storer the location where the data needs to be stored.
+ * The location string passed to the {@link StoreFunc} here is the
+ * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
* This method will be called in the frontend and backend multiple times. Implementations
* should bear in mind that this method is called multiple times and should
* ensure there are no inconsistent side effects due to the multiple calls.
@@ -84,7 +82,8 @@
* {@link #setStoreLocation(String, Job)}.
*
- * @param location Location indicated in store statement.
+ * @param location Location returned by
+ * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
* @param job The {@link Job} object
* @throws IOException if the location is not valid.
*/
@@ -102,6 +101,7 @@
* @throws IOException if this schema is not acceptable. It should include
* a detailed error message indicating what is wrong with the schema.
*/
+ @Override
public void checkSchema(ResourceSchema s) throws IOException {
// default implementation is a no-op
}
@@ -131,7 +131,41 @@
* will be called before other methods in {@link StoreFunc}.
* @param signature a unique signature to identify this StoreFunc
*/
+ @Override
public void setStoreFuncUDFContextSignature(String signature) {
// default implementation is a no-op
}
+
+ /**
+ * This method will be called by Pig if the job which contains this store
+ * fails. Implementations can clean up output locations in this method to
+ * ensure that no incorrect/incomplete results are left in the output location.
+ * The implementation in {@link StoreFunc} deletes the output location if it
+ * is a {@link FileSystem} location.
+ * @param location Location returned by
+ * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
+ */
+ @Override
+ public void cleanupOnFailure(String location, Job job)
+ throws IOException {
+ cleanupOnFailureImpl(location, job);
+ }
+
+ /**
+ * Implementation for {@link #cleanupOnFailure(String, Job)}
+ * @param location
+ * @param job
+ * @throws IOException
+ */
+ public static void cleanupOnFailureImpl(String location, Job job)
+ throws IOException {
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ Path path = new Path(location);
+ if(fs.exists(path)){
+ fs.delete(path, true);
+ }
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java Tue Mar 2 01:01:51 2010
@@ -68,18 +68,16 @@
OutputFormat getOutputFormat() throws IOException;
/**
- * Communicate to the store function the location used in Pig Latin to refer
- * to the object(s) being stored. That is, if the PL script is
- * <b>store A into 'bla'</b>
- * then 'bla' is the location. This location should be either a file name
- * or a URI. If it does not have a URI scheme Pig will assume it is a
- * filename.
+ * Communicate to the storer the location where the data needs to be stored.
+ * The location string passed to the {@link StoreFuncInterface} here is the
+ * return value of {@link StoreFuncInterface#relToAbsPathForStoreLocation(String, Path)}
* This method will be called in the frontend and backend multiple times. Implementations
* should bear in mind that this method is called multiple times and should
* ensure there are no inconsistent side effects due to the multiple calls.
*
- * @param location Location indicated in store statement.
+ * @param location Location returned by
+ * {@link StoreFuncInterface#relToAbsPathForStoreLocation(String, Path)}
* @param job The {@link Job} object
* @throws IOException if the location is not valid.
*/
@@ -124,4 +122,16 @@
* @param signature a unique signature to identify this StoreFuncInterface
*/
public void setStoreFuncUDFContextSignature(String signature);
+
+ /**
+ * This method will be called by Pig if the job which contains this store
+ * fails. Implementations can clean up output locations in this method to
+ * ensure that no incorrect/incomplete results are left in the output location
+ * @param location Location returned by
+ * {@link StoreFuncInterface#relToAbsPathForStoreLocation(String, Path)}
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
+ */
+ void cleanupOnFailure(String location, Job job) throws IOException;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreMetadata.java Tue Mar 2 01:01:51 2010
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
@@ -34,15 +35,19 @@
/**
* Store statistics about the data being written.
- *
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
* @throws IOException
*/
- void storeStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException;
+ void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException;
/**
* Store schema of the data being written
- *
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
* @throws IOException
*/
- void storeSchema(ResourceSchema schema, String location, Configuration conf) throws IOException;
+ void storeSchema(ResourceSchema schema, String location, Job job) throws IOException;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecJob.java Tue Mar 2 01:01:51 2010
@@ -22,6 +22,7 @@
import java.util.Properties;
import java.io.OutputStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.Tuple;
import org.apache.pig.tools.pigstats.PigStats;
@@ -80,6 +81,12 @@
* @return statistics relevant to the execution engine
*/
public PigStats getStatistics();
+
+ /**
+ *
+ * @return {@link POStore} object associated with the store
+ */
+ public POStore getPOStore();
/**
* hook for asynchronous notification of job completion pushed from the back-end
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Mar 2 01:01:51 2010
@@ -306,14 +306,16 @@
try {
PigStats stats = launcher.launchPig(plan, jobName, pigContext);
- for (FileSpec spec: launcher.getSucceededFiles()) {
+ for (POStore store: launcher.getSucceededFiles()) {
+ FileSpec spec = store.getSFile();
String alias = leafMap.containsKey(spec.toString()) ? leafMap.get(spec.toString()).getAlias() : null;
- jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec, alias, stats));
+ jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, store, alias, stats));
}
- for (FileSpec spec: launcher.getFailedFiles()) {
+ for (POStore store: launcher.getFailedFiles()) {
+ FileSpec spec = store.getSFile();
String alias = leafMap.containsKey(spec.toString()) ? leafMap.get(spec.toString()).getAlias() : null;
- HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, spec, alias, stats);
+ HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, store, alias, stats);
j.setException(launcher.getError(spec));
jobs.add(j);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Tue Mar 2 01:01:51 2010
@@ -19,23 +19,20 @@
package org.apache.pig.backend.hadoop.executionengine;
import java.io.OutputStream;
-import java.io.InputStream;
import java.util.Iterator;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.tools.pigstats.PigStats;
@@ -49,26 +46,29 @@
protected FileSpec outFileSpec;
protected Exception backendException;
protected String alias;
+ protected POStore poStore;
private PigStats stats;
public HJob(JOB_STATUS status,
PigContext pigContext,
- FileSpec outFileSpec,
+ POStore store,
String alias) {
this.status = status;
this.pigContext = pigContext;
- this.outFileSpec = outFileSpec;
+ this.poStore = store;
+ this.outFileSpec = poStore.getSFile();
this.alias = alias;
}
public HJob(JOB_STATUS status,
PigContext pigContext,
- FileSpec outFileSpec,
+ POStore store,
String alias,
PigStats stats) {
this.status = status;
this.pigContext = pigContext;
- this.outFileSpec = outFileSpec;
+ this.poStore = store;
+ this.outFileSpec = poStore.getSFile();
this.alias = alias;
this.stats = stats;
}
@@ -143,8 +143,7 @@
}
public Properties getConfiguration() {
- Properties props = new Properties();
- return props;
+ return pigContext.getProperties();
}
public PigStats getStatistics() {
@@ -184,4 +183,12 @@
public String getAlias() throws ExecException {
return alias;
}
+
+ /**
+ * @return the poStore
+ */
+ @Override
+ public POStore getPOStore() {
+ return poStore;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Mar 2 01:01:51 2010
@@ -133,7 +133,6 @@
*/
public static final String PIG_MAP_STORES = "pig.map.stores";
public static final String PIG_REDUCE_STORES = "pig.reduce.stores";
- public static final String PIG_STORE_FUNC = "pig.storeFunc";
// A mapping of job to pair of store locations and tmp locations for that job
private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
@@ -460,13 +459,6 @@
String outputPath = st.getSFile().getFileName();
FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
- // serialize the store func spec using ObjectSerializer
- // ObjectSerializer.serialize() uses default java serialization
- // and then further encodes the output so that control characters
- // get encoded as regular characters. Otherwise any control characters
- // in the store funcspec would break the job.xml which is created by
- // hadoop from the jobconf.
- conf.set(PIG_STORE_FUNC, ObjectSerializer.serialize(outputFuncSpec.toString()));
conf.set("pig.streaming.log.dir",
new Path(outputPath, LOG_DIR).toString());
conf.set("pig.streaming.task.output.dir", outputPath);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Tue Mar 2 01:01:51 2010
@@ -20,17 +20,16 @@
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
-import java.util.List;
import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
@@ -40,17 +39,13 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.tools.pigstats.PigStats;
public abstract class Launcher {
@@ -62,8 +57,8 @@
boolean outOfMemory = false;
static final String OOM_ERR = "OutOfMemoryError";
- protected List<FileSpec> succeededStores = null;
- protected List<FileSpec> failedStores = null;
+ protected List<POStore> succeededStores = null;
+ protected List<POStore> failedStores = null;
protected Launcher(){
totalHadoopTimeSpent = 0;
@@ -75,21 +70,19 @@
}
/**
- * Returns a list of locations of results that have been
- * successfully completed.
- * @return A list of filspecs that corresponds to the locations of
- * the successful stores.
+ *
+ * @return A list of {@link POStore} objects corresponding to the store
+ * statements that were successful
*/
- public List<FileSpec> getSucceededFiles() {
+ public List<POStore> getSucceededFiles() {
return succeededStores;
}
/**
- * Returns a list of locations of results that have failed.
- * @return A list of filspecs that corresponds to the locations of
- * the failed stores.
+ * @return A list of {@link POStore} objects corresponding to the store
+ * statements that failed
*/
- public List<FileSpec> getFailedFiles() {
+ public List<POStore> getFailedFiles() {
return failedStores;
}
@@ -97,8 +90,8 @@
* Resets the state after a launch
*/
public void reset() {
- succeededStores = new LinkedList<FileSpec>();
- failedStores = new LinkedList<FileSpec>();
+ succeededStores = new LinkedList<POStore>();
+ failedStores = new LinkedList<POStore>();
}
/**
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Mar 2 01:01:51 2010
@@ -276,9 +276,8 @@
finalStores++;
log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
}
- failedStores.add(st.getSFile());
+ failedStores.add(st);
failureMap.put(st.getSFile(), backendException);
- FileLocalizer.registerDeleteOnFail(st.getSFile().getFileName(), pc);
//log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
}
}
@@ -304,7 +303,7 @@
storeSchema(job, st);
}
if (!st.isTmpStore()) {
- succeededStores.add(st.getSFile());
+ succeededStores.add(st);
finalStores++;
log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Mar 2 01:01:51 2010
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -129,9 +130,7 @@
// call setLocation() on the storeFunc so that if there are any
// side effects like setting map.output.dir on the Configuration
// in the Context are needed by the OutputCommitter, those actions
- // will be done before the committer is created. Also the String
- // version of StoreFunc for the specific store need
- // to be set up in the context in case the committer needs them
+ // will be done before the committer is created.
PigOutputFormat.setLocation(contextCopy, store);
return contextCopy;
}
@@ -161,14 +160,11 @@
if (schema != null) {
((StoreMetadata) storeFunc).storeSchema(
new ResourceSchema(schema, store.getSortInfo()), store.getSFile()
- .getFileName(), conf);
+ .getFileName(), new Job(conf));
}
}
}
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.FileOutputCommitter#cleanupJob(org.apache.hadoop.mapred.JobContext)
- */
@Override
public void cleanupJob(JobContext context) throws IOException {
// call clean up on all map and reduce committers
@@ -188,9 +184,6 @@
}
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext)
- */
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
if(context.getTaskAttemptID().isMap()) {
@@ -210,9 +203,6 @@
}
}
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext)
- */
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
if(context.getTaskAttemptID().isMap()) {
@@ -232,9 +222,6 @@
}
}
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext)
- */
@Override
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
@@ -260,9 +247,6 @@
}
}
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#setupJob(org.apache.hadoop.mapreduce.JobContext)
- */
@Override
public void setupJob(JobContext context) throws IOException {
// call set up on all map and reduce committers
@@ -279,9 +263,6 @@
}
}
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext)
- */
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
if(context.getTaskAttemptID().isMap()) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Tue Mar 2 01:01:51 2010
@@ -178,12 +178,6 @@
// supplied as input has the updates.
ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
storeJob.getConfiguration());
-
- // Before delegating calls to underlying OutputFormat or OutputCommitter
- // Pig needs to ensure the Configuration in the JobContext contains
- // StoreFunc for the specific store - so set this up in the context
- // for this specific store
- updateContextWithStoreInfo(jobContext, store);
}
@Override
@@ -240,21 +234,4 @@
// will wrap the real OutputCommitter(s) belonging to the store(s)
return new PigOutputCommitter(taskattemptcontext);
}
-
- /**
- * Before delegating calls to underlying OutputFormat or OutputCommitter
- * Pig needs to ensure the Configuration in the {@link JobContext} contains
- * {@link JobControlCompiler#PIG_STORE_FUNC}. This helper method can be
- * used to set this up
- * @param context the job context
- * @param store the POStore whose information is to be put into the context
- * @throws IOException in case of failure
- */
- public static void updateContextWithStoreInfo(JobContext context,
- POStore store) throws IOException {
- Configuration conf = context.getConfiguration();
- conf.set(JobControlCompiler.PIG_STORE_FUNC,
- store.getSFile().getFuncSpec().toString());
-
- }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Tue Mar 2 01:01:51 2010
@@ -45,6 +45,7 @@
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
@@ -380,14 +381,15 @@
}
@Override
- public String[] getPartitionKeys(String location, Configuration conf)
+ public String[] getPartitionKeys(String location, Job job)
throws IOException {
return null;
}
@Override
- public ResourceSchema getSchema(String location, Configuration conf)
+ public ResourceSchema getSchema(String location, Job job)
throws IOException {
+ Configuration conf = job.getConfiguration();
Properties props = ConfigurationUtil.toProperties(conf);
// since local mode now is implemented as hadoop's local mode
// we can treat either local or hadoop mode as hadoop mode - hence
@@ -423,7 +425,7 @@
}
@Override
- public ResourceStatistics getStatistics(String location, Configuration conf)
+ public ResourceStatistics getStatistics(String location, Job job)
throws IOException {
throw new UnsupportedOperationException();
}
@@ -437,4 +439,9 @@
public void setStoreFuncUDFContextSignature(String signature) {
}
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ StoreFunc.cleanupOnFailureImpl(location, job);
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Mar 2 01:01:51 2010
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
@@ -45,6 +46,7 @@
import org.apache.pig.LoadPushDown;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
@@ -135,7 +137,7 @@
}
- ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);
+ protected ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);
@Override
public void putNext(Tuple f) throws IOException {
@@ -293,4 +295,10 @@
public void setStoreFuncUDFContextSignature(String signature) {
}
+ @Override
+ public void cleanupOnFailure(String location, Job job)
+ throws IOException {
+ StoreFunc.cleanupOnFailureImpl(location, job);
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Mar 2 01:01:51 2010
@@ -47,6 +47,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
public class LOLoad extends RelationalOperator {
private static final long serialVersionUID = 2L;
@@ -164,7 +165,7 @@
if(LoadMetadata.class.isAssignableFrom(mLoadFunc.getClass())) {
LoadMetadata loadMetadata = (LoadMetadata)mLoadFunc;
ResourceSchema rSchema = loadMetadata.getSchema(
- mInputFileSpec.getFileName(), conf);
+ mInputFileSpec.getFileName(), new Job(conf));
return Schema.getPigSchema(rSchema);
} else {
return null;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Tue Mar 2 01:01:51 2010
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
@@ -128,7 +129,7 @@
loadMetadata = (LoadMetadata)loadFunc;
try {
partitionKeys = loadMetadata.getPartitionKeys(
- loLoad.getInputFile().getFileName(), loLoad.getConfiguration());
+ loLoad.getInputFile().getFileName(), new Job(loLoad.getConfiguration()));
if(partitionKeys == null || partitionKeys.length == 0) {
return false;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java Tue Mar 2 01:01:51 2010
@@ -56,19 +56,25 @@
*/
public LogicalPlanValidationExecutor(LogicalPlan plan,
- PigContext pigContext) {
+ PigContext pigContext,
+ boolean beforeOptimizer) {
// Default validations
- if (!pigContext.inExplain) {
+ if (!pigContext.inExplain && !beforeOptimizer) {
// When running explain we don't want to check for input
- // files.
+ // files. - run this validator after optimizer for two reasons
+ // 1) input/output may get changed (optimized away)
+ // 2) we will call checkSchema on the StoreFunc for the store(s)
+ // in this validator and the schema should contain correct schema
+ // after optimization
validatorList.add(new InputOutputFileValidator(pigContext)) ;
+ } else if (beforeOptimizer) {
+ // This one has to be done before the type checker.
+ //validatorList.add(new TypeCastInserterValidator()) ;
+ validatorList.add(new TypeCheckingValidator()) ;
+
+ validatorList.add(new SchemaAliasValidator()) ;
}
- // This one has to be done before the type checker.
- //validatorList.add(new TypeCastInserterValidator()) ;
- validatorList.add(new TypeCheckingValidator()) ;
-
- validatorList.add(new SchemaAliasValidator()) ;
}
public void validate(LogicalPlan plan,
Modified: hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java Tue Mar 2 01:01:51 2010
@@ -214,13 +214,19 @@
CompilationMessageCollector collector = new CompilationMessageCollector();
FrontendException caught = null;
try {
+ boolean isBeforeOptimizer = true;
LogicalPlanValidationExecutor validator = new LogicalPlanValidationExecutor(
- plan, pigContext);
+ plan, pigContext, isBeforeOptimizer);
validator.validate(plan, collector);
FunctionalLogicalOptimizer optimizer = new FunctionalLogicalOptimizer(
plan);
optimizer.optimize();
+
+ isBeforeOptimizer = false;
+ validator = new LogicalPlanValidationExecutor(
+ plan, pigContext, isBeforeOptimizer);
+ validator.validate(plan, collector);
} catch (FrontendException fe) {
// Need to go through and see what the collector has in it. But
// remember what we've caught so we can wrap it into what we
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java Tue Mar 2 01:01:51 2010
@@ -18,13 +18,18 @@
package org.apache.pig.test;
import java.io.* ;
+import java.util.Iterator;
import java.util.Properties;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
@@ -37,7 +42,10 @@
import org.apache.pig.impl.logicalLayer.validators.* ;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.util.LogUtils;
import org.junit.Test;
+
+import junit.framework.Assert;
import junit.framework.TestCase;
public class TestInputOutputFileValidator extends TestCase {
@@ -57,7 +65,8 @@
LogicalPlan plan = genNewLoadStorePlan(inputfile, outputfile, ctx.getFs()) ;
CompilationMessageCollector collector = new CompilationMessageCollector() ;
- LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx) ;
+ boolean isBeforeOptimizer = false; // we are not optimizing in this testcase
+ LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx, isBeforeOptimizer) ;
executor.validate(plan, collector) ;
assertFalse(collector.hasError()) ;
@@ -77,7 +86,8 @@
LogicalPlan plan = genNewLoadStorePlan(inputfile, outputfile, ctx.getDfs()) ;
CompilationMessageCollector collector = new CompilationMessageCollector() ;
- LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx) ;
+ boolean isBeforeOptimizer = false; // we are not optimizing in this testcase
+ LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx, isBeforeOptimizer) ;
try {
executor.validate(plan, collector) ;
fail("Expected to fail.");
@@ -104,7 +114,8 @@
LogicalPlan plan = genNewLoadStorePlan(inputfile, outputfile, ctx.getDfs()) ;
CompilationMessageCollector collector = new CompilationMessageCollector() ;
- LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx) ;
+ boolean isBeforeOptimizer = false; // we are not optimizing in this testcase
+ LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx, isBeforeOptimizer) ;
executor.validate(plan, collector) ;
assertFalse(collector.hasError()) ;
@@ -123,7 +134,8 @@
LogicalPlan plan = genNewLoadStorePlan(inputfile, outputfile, ctx.getDfs()) ;
CompilationMessageCollector collector = new CompilationMessageCollector() ;
- LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx) ;
+ boolean isBeforeOptimizer = false; // we are not optimizing in this testcase
+ LogicalPlanValidationExecutor executor = new LogicalPlanValidationExecutor(plan, ctx, isBeforeOptimizer) ;
try {
executor.validate(plan, collector) ;
fail("Excepted to fail.");
@@ -137,6 +149,93 @@
}
}
+
+ /**
+ * Testcase to ensure Input output validation allows store to a location
+ * that does not exist when using {@link PigServer#store(String, String)}
+ * @throws Exception
+ */
+ @Test
+ public void testPigServerStore() throws Exception {
+ String input = "input.txt";
+ String output= "output.txt";
+ String data[] = new String[] {"hello\tworld"};
+ ExecType[] modes = new ExecType[] {ExecType.MAPREDUCE, ExecType.LOCAL};
+ PigServer pig = null;
+ for (ExecType execType : modes) {
+ try {
+ if(execType == ExecType.MAPREDUCE) {
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ Properties props = new Properties();
+ props.put(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ pig = new PigServer(ExecType.LOCAL, props);
+ }
+ // reinitialize FileLocalizer for each mode
+ // this is need for the tmp file creation as part of
+ // PigServer.openIterator
+ FileLocalizer.setInitialized(false);
+ Util.deleteFile(pig.getPigContext(), input);
+ Util.deleteFile(pig.getPigContext(), output);
+ Util.createInputFile(pig.getPigContext(), input, data);
+ pig.registerQuery("a = load '" + input + "';");
+ pig.store("a", output);
+ pig.registerQuery("b = load '" + output + "';");
+ Iterator<Tuple> it = pig.openIterator("b");
+ Tuple t = it.next();
+ Assert.assertEquals("hello", t.get(0).toString());
+ Assert.assertEquals("world", t.get(1).toString());
+ Assert.assertEquals(false, it.hasNext());
+ } finally {
+ Util.deleteFile(pig.getPigContext(), input);
+ Util.deleteFile(pig.getPigContext(), output);
+ }
+ }
+ }
+
+ /**
+ * Test case to test that Input output file validation catches the case
+ * where the output file exists when using
+ * {@link PigServer#store(String, String)}
+ * @throws Exception
+ */
+ @Test
+ public void testPigServerStoreNeg() throws Exception {
+ String input = "input.txt";
+ String output= "output.txt";
+ String data[] = new String[] {"hello\tworld"};
+ ExecType[] modes = new ExecType[] {ExecType.MAPREDUCE, ExecType.LOCAL};
+ PigServer pig = null;
+ for (ExecType execType : modes) {
+ try {
+ boolean exceptionCaught = false;
+ if(execType == ExecType.MAPREDUCE) {
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ Properties props = new Properties();
+ props.put(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ pig = new PigServer(ExecType.LOCAL, props);
+ }
+ Util.deleteFile(pig.getPigContext(), input);
+ Util.deleteFile(pig.getPigContext(), output);
+ Util.createInputFile(pig.getPigContext(), input, data);
+ Util.createInputFile(pig.getPigContext(), output, data);
+ try {
+ pig.registerQuery("a = load '" + input + "';");
+ pig.store("a", output);
+ } catch (Exception e) {
+ assertEquals(6000, LogUtils.getPigException(e).getErrorCode());
+ exceptionCaught = true;
+ }
+ if(!exceptionCaught) {
+ Assert.fail("Expected exception to be caught");
+ }
+ } finally {
+ Util.deleteFile(pig.getPigContext(), input);
+ Util.deleteFile(pig.getPigContext(), output);
+ }
+ }
+ }
private LogicalPlan genNewLoadStorePlan(String inputFile,
String outputFile, DataStorage dfs)
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java Tue Mar 2 01:01:51 2010
@@ -302,13 +302,19 @@
CompilationMessageCollector collector = new CompilationMessageCollector();
FrontendException caught = null;
try {
+ boolean isBeforeOptimizer = true;
LogicalPlanValidationExecutor validator = new LogicalPlanValidationExecutor(
- plan, pigContext);
+ plan, pigContext, isBeforeOptimizer);
validator.validate(plan, collector);
FunctionalLogicalOptimizer optimizer = new FunctionalLogicalOptimizer(
plan);
optimizer.optimize();
+
+ isBeforeOptimizer = false;
+ validator = new LogicalPlanValidationExecutor(
+ plan, pigContext, isBeforeOptimizer);
+ validator.validate(plan, collector);
} catch (FrontendException fe) {
// Need to go through and see what the collector has in it. But
// remember what we've caught so we can wrap it into what we
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java Tue Mar 2 01:01:51 2010
@@ -505,20 +505,20 @@
}
@Override
- public String[] getPartitionKeys(String location, Configuration conf)
+ public String[] getPartitionKeys(String location, Job job)
throws IOException {
return partCols;
}
@Override
- public ResourceSchema getSchema(String location, Configuration conf)
+ public ResourceSchema getSchema(String location, Job job)
throws IOException {
return new ResourceSchema(schema);
}
@Override
public ResourceStatistics getStatistics(String location,
- Configuration conf) throws IOException {
+ Job job) throws IOException {
return null;
}