You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/04 21:00:53 UTC
svn commit: r1564451 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/builtin/ src/org/apache/pig/newplan/logical/rules/
test/org/apache/pig/test/
Author: daijy
Date: Tue Feb 4 20:00:52 2014
New Revision: 1564451
URL: http://svn.apache.org/r1564451
Log:
PIG-259: allow store to overwrite existing directroy
Added:
pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
pig/trunk/src/org/apache/pig/builtin/PigStorage.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
pig/trunk/test/org/apache/pig/test/TestPigStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Feb 4 20:00:52 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-259: allow store to overwrite existing directroy (nezihyigitbasi via daijy)
+
PIG-2672: Optimize the use of DistributedCache (aniket486)
PIG-3238: Pig current releases lack a UDF Stuff(). This UDF deletes a specified length of characters
Added: pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java?rev=1564451&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java (added)
+++ pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java Tue Feb 4 20:00:52 2014
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+
+/**
+ * This interface defines whether storefunc will cleanup the output before writing data
+ */
+public interface OverwritingStoreFunc {
+ /**
+ * @return whether the implementation supports overwrite
+ */
+ public boolean isOverwrite();
+
+ /**
+ * cleanup the old output if you want to overwrite
+ *
+ * @param store
+ * the store information you would like to delete
+ * @param job
+ * used for deletion operation
+ * @throws IOException
+ */
+ public void cleanupOutput(POStore store, Job job) throws IOException;
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Feb 4 20:00:52 2014
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapred.jobcontr
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
+import org.apache.pig.OverwritingStoreFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
@@ -660,16 +661,28 @@ public class JobControlCompiler{
LinkedList<POStore> mapStores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
LinkedList<POStore> reduceStores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
- for (POStore st: mapStores) {
+ for (POStore st : mapStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ if (sFunc instanceof OverwritingStoreFunc) {
+ OverwritingStoreFunc osf = (OverwritingStoreFunc) sFunc;
+ if (osf.isOverwrite()) {
+ osf.cleanupOutput(st, nwJob);
+ }
+ }
}
- for (POStore st: reduceStores) {
+ for (POStore st : reduceStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ if (sFunc instanceof OverwritingStoreFunc) {
+ OverwritingStoreFunc osf = (OverwritingStoreFunc) sFunc;
+ if (osf.isOverwrite()) {
+ osf.cleanupOutput(st, nwJob);
+ }
+ }
}
// the OutputFormat we report to Hadoop is always PigOutputFormat which
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Tue Feb 4 20:00:52 2014
@@ -22,18 +22,22 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.OverwritingStoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -204,7 +208,21 @@ public class PigOutputFormat extends Out
// The above call should have update the conf in the JobContext
// to have the output location - now call checkOutputSpecs()
- of.checkOutputSpecs(jobContextCopy);
+ try {
+ of.checkOutputSpecs(jobContextCopy);
+ } catch (IOException ioe) {
+ boolean shouldThrowException = true;
+ if (sFunc instanceof OverwritingStoreFunc) {
+ if (((OverwritingStoreFunc) sFunc).isOverwrite()) {
+ if (ioe instanceof FileAlreadyExistsException
+ || ioe instanceof org.apache.hadoop.fs.FileAlreadyExistsException) {
+ shouldThrowException = false;
+ }
+ }
+ }
+ if (shouldThrowException)
+ throw ioe;
+ }
}
}
/**
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Feb 4 20:00:52 2014
@@ -27,10 +27,14 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
@@ -50,6 +54,7 @@ import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
+import org.apache.pig.OverwritingStoreFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -61,10 +66,12 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.CastUtils;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -125,7 +132,7 @@ import org.apache.pig.parser.ParserExcep
*/
@SuppressWarnings("unchecked")
public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface,
-LoadPushDown, LoadMetadata, StoreMetadata {
+LoadPushDown, LoadMetadata, StoreMetadata, OverwritingStoreFunc {
protected RecordReader in = null;
protected RecordWriter writer = null;
protected final Log mLog = LogFactory.getLog(getClass());
@@ -138,6 +145,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
boolean isSchemaOn = false;
boolean dontLoadSchema = false;
+ boolean overwriteOutput = false;
protected ResourceSchema schema;
protected LoadCaster caster;
@@ -161,6 +169,8 @@ LoadPushDown, LoadMetadata, StoreMetadat
validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
+ Option overwrite = OptionBuilder.hasOptionalArgs(1).withArgName("overwrite").withLongOpt("overwrite").withDescription("Overwrites the destination.").create();
+ validOptions.addOption(overwrite);
}
public PigStorage() {
@@ -200,6 +210,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
try {
configuredOptions = parser.parse(validOptions, optsArr);
isSchemaOn = configuredOptions.hasOption("schema");
+ if (configuredOptions.hasOption("overwrite")) {
+ String value = configuredOptions.getOptionValue("overwrite");
+ if ("true".equalsIgnoreCase(value)) {
+ overwriteOutput = true;
+ }
+ }
dontLoadSchema = configuredOptions.hasOption("noschema");
tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
@@ -568,4 +584,24 @@ LoadPushDown, LoadMetadata, StoreMetadat
Job job) throws IOException {
}
+
+ @Override
+ public boolean isOverwrite() {
+ return this.overwriteOutput;
+ }
+
+ @Override
+ public void cleanupOutput(POStore store, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ String output = conf.get("mapred.output.dir");
+ Path outputPath = null;
+ if (output != null)
+ outputPath = new Path(output);
+ FileSystem fs = outputPath.getFileSystem(conf);
+ try {
+ fs.delete(outputPath, true);
+ } catch (Exception e) {
+ mLog.warn("Could not delete output " + output);
+ }
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java Tue Feb 4 20:00:52 2014
@@ -19,7 +19,9 @@ package org.apache.pig.newplan.logical.r
import java.io.IOException;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.OverwritingStoreFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
@@ -91,8 +93,22 @@ public class InputOutputFileValidator {
errCode = 4000;
break;
}
- validationErrStr += ioe.getMessage();
- throw new VisitorException(store, validationErrStr, errCode, errSrc, ioe);
+
+ boolean shouldThrowException = true;
+ if (sf instanceof OverwritingStoreFunc) {
+ if (((OverwritingStoreFunc) sf).isOverwrite()) {
+ if (ioe instanceof FileAlreadyExistsException
+ || ioe instanceof org.apache.hadoop.fs.FileAlreadyExistsException) {
+ shouldThrowException = false;
+ }
+ }
+ }
+ if (shouldThrowException) {
+ validationErrStr += ioe.getMessage();
+ throw new VisitorException(store, validationErrStr,
+ errCode, errSrc, ioe);
+ }
+
} catch (InterruptedException ie) {
validationErrStr += ie.getMessage();
throw new VisitorException(store, validationErrStr, errCode, pigCtx.getErrorSource(), ie);
Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Tue Feb 4 20:00:52 2014
@@ -669,4 +669,46 @@ public class TestPigStorage {
assertEquals(tuple(null,null), it.next());
assertFalse(it.hasNext());
}
+
+
+ @Test
+ public void testPigStorageSchemaWithOverwrite() throws Exception {
+ pigContext.connect();
+ String query = "a = LOAD '" + datadir
+ + "originput' using PigStorage(',') "
+ + "as (f1:chararray, f2:int);";
+
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] { "('A',1L)",
+ "('B',2L)", "('C',3L)", "('D',2L)", "('A',5L)",
+ "('B',5L)", "('C',8L)", "('A',8L)", "('D',8L)",
+ "('A',9L)", });
+
+ pig.registerQuery(query);
+ pig.store("a", datadir + "aout", "PigStorage(',')");
+ // below shouldn't fail & we should get the same result in the end
+ pig.store("a", datadir + "aout", "PigStorage(',', '--overwrite true')");
+ pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage(',');");
+ Iterator<Tuple> iter = pig.openIterator("b");
+ int counter = 0;
+ while (iter.hasNext()) {
+ String tuple = iter.next().toString();
+ Assert.assertEquals(expectedResults.get(counter++).toString(),
+ tuple);
+ }
+ Assert.assertEquals(expectedResults.size(), counter);
+
+ }
+
+ @Test(expected = Exception.class)
+ public void testPigStorageSchemaFailureWithoutOverwrite() throws Exception {
+ pigContext.connect();
+ String query = "a = LOAD '" + datadir + "originput' using PigStorage(',') "
+ + "as (f1:chararray, f2:int);";
+ pig.registerQuery(query);
+ // should fail without the overwrite flag
+ pig.store("a", datadir + "aout", "PigStorage(',')");
+ pig.store("a", datadir + "aout", "PigStorage(',')");
+ }
+
}