You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2012/09/07 23:21:09 UTC

svn commit: r1382178 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/hbase/ src/org/apache/pig/builtin/ src/org/apache/pig/builtin/mock/ src/org/apache/pig/impl/io/ test/org/apache/pig/

Author: gates
Date: Fri Sep  7 21:21:08 2012
New Revision: 1382178

URL: http://svn.apache.org/viewvc?rev=1382178&view=rev
Log:
PIG-1891 Enable StoreFunc to make intelligent decision based on job success or failure

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/StoreFunc.java
    pig/trunk/src/org/apache/pig/StoreFuncInterface.java
    pig/trunk/src/org/apache/pig/StoreFuncWrapper.java
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
    pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
    pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java
    pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
    pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java
    pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Sep  7 21:21:08 2012
@@ -21,6 +21,7 @@ Pig Change Log
 Trunk (unreleased changes)
 
 INCOMPATIBLE CHANGES
+PIG-1891 Enable StoreFunc to make intelligent decision based on job success or failure (initialcontext via gates)
 
 IMPROVEMENTS
 

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Sep  7 21:21:08 2012
@@ -1288,9 +1288,17 @@ public class PigServer {
                 } catch (IOException e) {
                     throw new ExecException(e);
                 }
+            } else {
+                POStore store = output.getPOStore();
+                try {
+                    store.getStoreFunc().cleanupOnSuccess(
+                            store.getSFile().getFileName(),
+                            new Job(output.getConf()));
+                } catch (IOException e) {
+                    throw new ExecException(e);
+                }
             }
         }
-
         return stats;
     }
 

Modified: pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFunc.java Fri Sep  7 21:21:08 2012
@@ -159,9 +159,28 @@ public abstract class StoreFunc implemen
     throws IOException {
         cleanupOnFailureImpl(location, job);
     }
+
+    /**
+     * This method will be called by Pig if the job which contains this store
+     * is successful, and some cleanup of intermediate resources is required.
+     * Implementations can clean up output locations in this method to
+     * ensure that no incorrect/incomplete results are left in the output location.
+     * @param location Location returned by 
+     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information. 
+     */
+    @Override
+    public void cleanupOnSuccess(String location, Job job) 
+    throws IOException {
+        // DEFAULT: DO NOTHING, user-defined overrides can
+        // call cleanupOnFailureImpl(location, job) or ...?
+    }
     
     /**
-     * Implementation for {@link #cleanupOnFailure(String, Job)}.  This removes a file
+     * Default implementation for {@link #cleanupOnFailure(String, Job)}
+     * and {@link #cleanupOnSuccess(String, Job)}.  This removes a file
      * from HDFS.
      * @param location file name (or URI) of file to remove
      * @param job Hadoop job, used to access the appropriate file system.

Modified: pig/trunk/src/org/apache/pig/StoreFuncInterface.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFuncInterface.java Fri Sep  7 21:21:08 2012
@@ -139,4 +139,17 @@ public interface StoreFuncInterface {
      * any runtime job information. 
      */
     void cleanupOnFailure(String location, Job job) throws IOException;
+
+    /**
+     * This method will be called by Pig if the job which contains this store
+     * is successful, and some cleanup of intermediate resources is required.
+     * Implementations can clean up output locations in this method to
+     * ensure that no incorrect/incomplete results are left in the output location
+     * @param location Location returned by 
+     * {@link StoreFuncInterface#relToAbsPathForStoreLocation(String, Path)}
+     * @param job The {@link Job} object - this should be used only to obtain 
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information. 
+     */
+    void cleanupOnSuccess(String location, Job job) throws IOException;
 }

Modified: pig/trunk/src/org/apache/pig/StoreFuncWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncWrapper.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFuncWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFuncWrapper.java Fri Sep  7 21:21:08 2012
@@ -87,6 +87,11 @@ public class StoreFuncWrapper implements
         storeFunc().cleanupOnFailure(location, job);
     }
 
+    @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+        storeFunc().cleanupOnSuccess(location, job);
+    }
+
     private StoreFuncInterface storeFunc() {
         if (this.storeFunc == null) {
             // Pig does not re-throw the exception with a stack trace in the parse phase.

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Sep  7 21:21:08 2012
@@ -887,6 +887,11 @@ public class HBaseStorage extends LoadFu
     public void cleanupOnFailure(String location, Job job) throws IOException {
     }
 
+    @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+    }
+
+
     /*
      * LoadPushDown Methods.
      */

Modified: pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/BinStorage.java Fri Sep  7 21:21:08 2012
@@ -442,4 +442,8 @@ implements StoreFuncInterface, LoadMetad
         StoreFunc.cleanupOnFailureImpl(location, job);
     }
 
+    @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+        // DEFAULT: do nothing
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Fri Sep  7 21:21:08 2012
@@ -453,6 +453,13 @@ LoadPushDown, LoadMetadata, StoreMetadat
         StoreFunc.cleanupOnFailureImpl(location, job);
     }
 
+    @Override
+    public void cleanupOnSuccess(String location, Job job)
+    throws IOException {
+        // DEFAULT: do nothing
+    }
+
+
 
     //------------------------------------------------------------------------
     // Implementation of LoadMetaData interface

Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Fri Sep  7 21:21:08 2012
@@ -455,6 +455,11 @@ private MockRecordWriter mockRecordWrite
 	init(location, job);
   }
 
+  @Override
+  public void cleanupOnSuccess(String location, Job job) throws IOException {
+	init(location, job);
+  }
+
   // StoreMetaData
   
   @Override

Modified: pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterStorage.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterStorage.java Fri Sep  7 21:21:08 2012
@@ -203,4 +203,8 @@ implements StoreFuncInterface, LoadMetad
         StoreFunc.cleanupOnFailureImpl(location, job);
     }
 
+    @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+        // DEFAULT: do nothing
+    }
 }

Modified: pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java Fri Sep  7 21:21:08 2012
@@ -206,4 +206,8 @@ public class TFileStorage extends FileIn
         StoreFunc.cleanupOnFailureImpl(location, job);
     }
 
+    @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+        // DEFAULT: do nothing
+    }
 }

Modified: pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original)
+++ pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Fri Sep  7 21:21:08 2012
@@ -340,7 +340,7 @@ public class TestLoadStoreFuncLifeCycle 
         assertEquals("c", out.get(2).get(0));
 
         assertTrue("loader instanciation count increasing: " + Loader.count, Loader.count <= 3);
-        assertTrue("storer instanciation count increasing: " + Storer.count, Storer.count <= 3);
+        assertTrue("storer instanciation count increasing: " + Storer.count, Storer.count <= 4);
 
     }
 

Modified: pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java (original)
+++ pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java Fri Sep  7 21:21:08 2012
@@ -66,6 +66,9 @@ public class TestStoreFuncMetadataWrappe
         wrapper.setStoreFuncUDFContextSignature(null);
         assertEquals("setStoreFuncUDFContextSignature", storeFunc.getLastMethodCalled());
 
+        wrapper.cleanupOnSuccess(null, null);
+        assertEquals("cleanupOnSuccess", storeFunc.getLastMethodCalled());
+
         wrapper.cleanupOnFailure(null, null);
         assertEquals("cleanupOnFailure", storeFunc.getLastMethodCalled());
 

Modified: pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java (original)
+++ pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java Fri Sep  7 21:21:08 2012
@@ -63,6 +63,9 @@ public class TestStoreFuncWrapper {
 
         wrapper.setStoreFuncUDFContextSignature(null);
         assertEquals("setStoreFuncUDFContextSignature", storeFunc.getLastMethodCalled());
+  
+        wrapper.cleanupOnSuccess(null, null);
+        assertEquals("cleanupOnSuccess", storeFunc.getLastMethodCalled());
 
         wrapper.cleanupOnFailure(null, null);
         assertEquals("cleanupOnFailure", storeFunc.getLastMethodCalled());
@@ -150,6 +153,11 @@ public class TestStoreFuncWrapper {
         }
 
         @Override
+        public void cleanupOnSuccess(String location, Job job) throws IOException {
+            setLastMethodCalled();
+        }
+
+        @Override
         public void cleanupOnFailure(String location, Job job) throws IOException {
             setLastMethodCalled();
         }