You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/04 21:00:53 UTC

svn commit: r1564451 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/builtin/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/

Author: daijy
Date: Tue Feb  4 20:00:52 2014
New Revision: 1564451

URL: http://svn.apache.org/r1564451
Log:
PIG-259: allow store to overwrite existing directroy

Added:
    pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
    pig/trunk/test/org/apache/pig/test/TestPigStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Feb  4 20:00:52 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-259: allow store to overwrite existing directroy (nezihyigitbasi via daijy)
+
 PIG-2672: Optimize the use of DistributedCache (aniket486)
 
 PIG-3238: Pig current releases lack a UDF Stuff(). This UDF deletes a specified length of characters

Added: pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java?rev=1564451&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java (added)
+++ pig/trunk/src/org/apache/pig/OverwritingStoreFunc.java Tue Feb  4 20:00:52 2014
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+
+/**
+ * This interface defines whether storefunc will cleanup the output before writing data
+ */
+public interface OverwritingStoreFunc {
+    /**
+     * @return whether the implementation supports overwrite
+     */
+    public boolean isOverwrite();
+
+    /**
+     * cleanup the old output if you want to overwrite
+     * 
+     * @param store
+     *            the store information you would like to delete
+     * @param job
+     *            used for deletion operation
+     * @throws IOException
+     */
+    public void cleanupOutput(POStore store, Job job) throws IOException;
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Feb  4 20:00:52 2014
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapred.jobcontr
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.OverwritingStoreFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
@@ -660,16 +661,28 @@ public class JobControlCompiler{
             LinkedList<POStore> mapStores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
             LinkedList<POStore> reduceStores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
 
-            for (POStore st: mapStores) {
+            for (POStore st : mapStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
                 sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+                if (sFunc instanceof OverwritingStoreFunc) {
+                    OverwritingStoreFunc osf = (OverwritingStoreFunc) sFunc;
+                    if (osf.isOverwrite()) {
+                        osf.cleanupOutput(st, nwJob);
+                    }
+                }
             }
 
-            for (POStore st: reduceStores) {
+            for (POStore st : reduceStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
                 sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+                if (sFunc instanceof OverwritingStoreFunc) {
+                    OverwritingStoreFunc osf = (OverwritingStoreFunc) sFunc;
+                    if (osf.isOverwrite()) {
+                        osf.cleanupOutput(st, nwJob);
+                    }
+                }
             }
 
             // the OutputFormat we report to Hadoop is always PigOutputFormat which

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Tue Feb  4 20:00:52 2014
@@ -22,18 +22,22 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.OverwritingStoreFunc;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -204,7 +208,21 @@ public class PigOutputFormat extends Out
             
             // The above call should have update the conf in the JobContext
             // to have the output location - now call checkOutputSpecs()
-            of.checkOutputSpecs(jobContextCopy);
+            try {
+                of.checkOutputSpecs(jobContextCopy);
+            } catch (IOException ioe) {
+                boolean shouldThrowException = true;
+                if (sFunc instanceof OverwritingStoreFunc) {
+                    if (((OverwritingStoreFunc) sFunc).isOverwrite()) {
+                        if (ioe instanceof FileAlreadyExistsException
+                                || ioe instanceof org.apache.hadoop.fs.FileAlreadyExistsException) {
+                            shouldThrowException = false;
+                        }
+                    }
+                }
+                if (shouldThrowException)
+                    throw ioe;
+            }
         }
     }
     /**

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Feb  4 20:00:52 2014
@@ -27,10 +27,14 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.BZip2Codec;
@@ -50,6 +54,7 @@ import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPushDown;
+import org.apache.pig.OverwritingStoreFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -61,10 +66,12 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.CastUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -125,7 +132,7 @@ import org.apache.pig.parser.ParserExcep
  */
 @SuppressWarnings("unchecked")
 public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface,
-LoadPushDown, LoadMetadata, StoreMetadata {
+LoadPushDown, LoadMetadata, StoreMetadata, OverwritingStoreFunc {
     protected RecordReader in = null;
     protected RecordWriter writer = null;
     protected final Log mLog = LogFactory.getLog(getClass());
@@ -138,6 +145,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
 
     boolean isSchemaOn = false;
     boolean dontLoadSchema = false;
+    boolean overwriteOutput = false;
     protected ResourceSchema schema;
     protected LoadCaster caster;
 
@@ -161,6 +169,8 @@ LoadPushDown, LoadMetadata, StoreMetadat
         validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
         validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
         validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
+        Option overwrite = OptionBuilder.hasOptionalArgs(1).withArgName("overwrite").withLongOpt("overwrite").withDescription("Overwrites the destination.").create();
+        validOptions.addOption(overwrite);        
     }
 
     public PigStorage() {
@@ -200,6 +210,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
         try {
             configuredOptions = parser.parse(validOptions, optsArr);
             isSchemaOn = configuredOptions.hasOption("schema");
+            if (configuredOptions.hasOption("overwrite")) {
+                String value = configuredOptions.getOptionValue("overwrite");
+                if ("true".equalsIgnoreCase(value)) {
+                    overwriteOutput = true;
+                }
+            }       
             dontLoadSchema = configuredOptions.hasOption("noschema");
             tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
             tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
@@ -568,4 +584,24 @@ LoadPushDown, LoadMetadata, StoreMetadat
             Job job) throws IOException {
 
     }
+
+    @Override
+    public boolean isOverwrite() {
+        return this.overwriteOutput;
+    }
+
+    @Override
+    public void cleanupOutput(POStore store, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        String output = conf.get("mapred.output.dir");
+        Path outputPath = null;
+        if (output != null)
+            outputPath = new Path(output);
+        FileSystem fs = outputPath.getFileSystem(conf);
+        try {
+            fs.delete(outputPath, true);
+        } catch (Exception e) {
+            mLog.warn("Could not delete output " + output);
+        }
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java Tue Feb  4 20:00:52 2014
@@ -19,7 +19,9 @@ package org.apache.pig.newplan.logical.r
 
 import java.io.IOException;
 
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.OverwritingStoreFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFuncInterface;
@@ -91,8 +93,22 @@ public class InputOutputFileValidator {
                     errCode = 4000;
                     break;
                 }
-                validationErrStr  += ioe.getMessage();
-                throw new VisitorException(store, validationErrStr, errCode, errSrc, ioe);
+                
+                boolean shouldThrowException = true;
+                if (sf instanceof OverwritingStoreFunc) {
+                    if (((OverwritingStoreFunc) sf).isOverwrite()) {
+                        if (ioe instanceof FileAlreadyExistsException
+                                || ioe instanceof org.apache.hadoop.fs.FileAlreadyExistsException) {
+                            shouldThrowException = false;
+                        }
+                    }
+                }
+                if (shouldThrowException) {
+                    validationErrStr += ioe.getMessage();
+                    throw new VisitorException(store, validationErrStr,
+                            errCode, errSrc, ioe);
+                }
+
             } catch (InterruptedException ie) {
                 validationErrStr += ie.getMessage();
                 throw new VisitorException(store, validationErrStr, errCode, pigCtx.getErrorSource(), ie);

Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1564451&r1=1564450&r2=1564451&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Tue Feb  4 20:00:52 2014
@@ -669,4 +669,46 @@ public class TestPigStorage  {
         assertEquals(tuple(null,null), it.next());
         assertFalse(it.hasNext());
     }
+
+
+    @Test
+    public void testPigStorageSchemaWithOverwrite() throws Exception {
+        pigContext.connect();
+        String query = "a = LOAD '" + datadir
+                + "originput' using PigStorage(',') "
+                + "as (f1:chararray, f2:int);";
+
+        List<Tuple> expectedResults = Util
+                .getTuplesFromConstantTupleStrings(new String[] { "('A',1L)",
+                        "('B',2L)", "('C',3L)", "('D',2L)", "('A',5L)",
+                        "('B',5L)", "('C',8L)", "('A',8L)", "('D',8L)",
+                        "('A',9L)", });
+
+        pig.registerQuery(query);
+        pig.store("a", datadir + "aout", "PigStorage(',')");
+        // below shouldn't fail & we should get the same result in the end
+        pig.store("a", datadir + "aout", "PigStorage(',', '--overwrite true')");
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage(',');");
+        Iterator<Tuple> iter = pig.openIterator("b");
+        int counter = 0;
+        while (iter.hasNext()) {
+            String tuple = iter.next().toString();
+            Assert.assertEquals(expectedResults.get(counter++).toString(),
+                    tuple);
+        }
+        Assert.assertEquals(expectedResults.size(), counter);
+
+    }
+
+    @Test(expected = Exception.class)
+    public void testPigStorageSchemaFailureWithoutOverwrite() throws Exception {
+        pigContext.connect();
+        String query = "a = LOAD '" + datadir + "originput' using PigStorage(',') "
+                + "as (f1:chararray, f2:int);";
+        pig.registerQuery(query);
+        // should fail without the overwrite flag
+        pig.store("a", datadir + "aout", "PigStorage(',')");
+        pig.store("a", datadir + "aout", "PigStorage(',')");
+    }
+    
 }