You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2012/09/07 23:21:09 UTC
svn commit: r1382178 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/hbase/ src/org/apache/pig/builtin/
src/org/apache/pig/builtin/mock/ src/org/apache/pig/impl/io/
test/org/apache/pig/
Author: gates
Date: Fri Sep 7 21:21:08 2012
New Revision: 1382178
URL: http://svn.apache.org/viewvc?rev=1382178&view=rev
Log:
PIG-1891 Enable StoreFunc to make intelligent decision based on job success or failure
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/StoreFunc.java
pig/trunk/src/org/apache/pig/StoreFuncInterface.java
pig/trunk/src/org/apache/pig/StoreFuncWrapper.java
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/trunk/src/org/apache/pig/builtin/BinStorage.java
pig/trunk/src/org/apache/pig/builtin/PigStorage.java
pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java
pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java
pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Sep 7 21:21:08 2012
@@ -21,6 +21,7 @@ Pig Change Log
Trunk (unreleased changes)
INCOMPATIBLE CHANGES
+PIG-1891 Enable StoreFunc to make intelligent decision based on job success or failure (initialcontext via gates)
IMPROVEMENTS
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Sep 7 21:21:08 2012
@@ -1288,9 +1288,17 @@ public class PigServer {
} catch (IOException e) {
throw new ExecException(e);
}
+ } else {
+ POStore store = output.getPOStore();
+ try {
+ store.getStoreFunc().cleanupOnSuccess(
+ store.getSFile().getFileName(),
+ new Job(output.getConf()));
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
}
}
-
return stats;
}
Modified: pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFunc.java Fri Sep 7 21:21:08 2012
@@ -159,9 +159,28 @@ public abstract class StoreFunc implemen
throws IOException {
cleanupOnFailureImpl(location, job);
}
+
+ /**
+ * This method will be called by Pig if the job which contains this store
+ * is successful, and some cleanup of intermediate resources is required.
+ * Implementations can clean up output locations in this method to
+ * ensure that no incorrect/incomplete results are left in the output location.
+ * @param location Location returned by
+ * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
+ */
+ @Override
+ public void cleanupOnSuccess(String location, Job job)
+ throws IOException {
+ // DEFAULT: DO NOTHING, user-defined overrides can
+ // call cleanupOnFailureImpl(location, job) or ...?
+ }
/**
- * Implementation for {@link #cleanupOnFailure(String, Job)}. This removes a file
+ * Default implementation for {@link #cleanupOnFailure(String, Job)}
+ * and {@link #cleanupOnSuccess(String, Job)}. This removes a file
* from HDFS.
* @param location file name (or URI) of file to remove
* @param job Hadoop job, used to access the appropriate file system.
Modified: pig/trunk/src/org/apache/pig/StoreFuncInterface.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFuncInterface.java Fri Sep 7 21:21:08 2012
@@ -139,4 +139,17 @@ public interface StoreFuncInterface {
* any runtime job information.
*/
void cleanupOnFailure(String location, Job job) throws IOException;
+
+ /**
+ * This method will be called by Pig if the job which contains this store
+ * is successful, and some cleanup of intermediate resources is required.
+ * Implementations can clean up output locations in this method to
+ * ensure that no incorrect/incomplete results are left in the output location
+ * @param location Location returned by
+ * {@link StoreFuncInterface#relToAbsPathForStoreLocation(String, Path)}
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
+ */
+ void cleanupOnSuccess(String location, Job job) throws IOException;
}
Modified: pig/trunk/src/org/apache/pig/StoreFuncWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncWrapper.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFuncWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFuncWrapper.java Fri Sep 7 21:21:08 2012
@@ -87,6 +87,11 @@ public class StoreFuncWrapper implements
storeFunc().cleanupOnFailure(location, job);
}
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ storeFunc().cleanupOnSuccess(location, job);
+ }
+
private StoreFuncInterface storeFunc() {
if (this.storeFunc == null) {
// Pig does not re-throw the exception with a stack trace in the parse phase.
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Sep 7 21:21:08 2012
@@ -887,6 +887,11 @@ public class HBaseStorage extends LoadFu
public void cleanupOnFailure(String location, Job job) throws IOException {
}
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+
/*
* LoadPushDown Methods.
*/
Modified: pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/BinStorage.java Fri Sep 7 21:21:08 2012
@@ -442,4 +442,8 @@ implements StoreFuncInterface, LoadMetad
StoreFunc.cleanupOnFailureImpl(location, job);
}
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ // DEFAULT: do nothing
+ }
}
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Fri Sep 7 21:21:08 2012
@@ -453,6 +453,13 @@ LoadPushDown, LoadMetadata, StoreMetadat
StoreFunc.cleanupOnFailureImpl(location, job);
}
+ @Override
+ public void cleanupOnSuccess(String location, Job job)
+ throws IOException {
+ // DEFAULT: do nothing
+ }
+
+
//------------------------------------------------------------------------
// Implementation of LoadMetaData interface
Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Fri Sep 7 21:21:08 2012
@@ -455,6 +455,11 @@ private MockRecordWriter mockRecordWrite
init(location, job);
}
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ init(location, job);
+ }
+
// StoreMetaData
@Override
Modified: pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterStorage.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterStorage.java Fri Sep 7 21:21:08 2012
@@ -203,4 +203,8 @@ implements StoreFuncInterface, LoadMetad
StoreFunc.cleanupOnFailureImpl(location, job);
}
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ // DEFAULT: do nothing
+ }
}
Modified: pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java Fri Sep 7 21:21:08 2012
@@ -206,4 +206,8 @@ public class TFileStorage extends FileIn
StoreFunc.cleanupOnFailureImpl(location, job);
}
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ // DEFAULT: do nothing
+ }
}
Modified: pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original)
+++ pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Fri Sep 7 21:21:08 2012
@@ -340,7 +340,7 @@ public class TestLoadStoreFuncLifeCycle
assertEquals("c", out.get(2).get(0));
assertTrue("loader instanciation count increasing: " + Loader.count, Loader.count <= 3);
- assertTrue("storer instanciation count increasing: " + Storer.count, Storer.count <= 3);
+ assertTrue("storer instanciation count increasing: " + Storer.count, Storer.count <= 4);
}
Modified: pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java (original)
+++ pig/trunk/test/org/apache/pig/TestStoreFuncMetadataWrapper.java Fri Sep 7 21:21:08 2012
@@ -66,6 +66,9 @@ public class TestStoreFuncMetadataWrappe
wrapper.setStoreFuncUDFContextSignature(null);
assertEquals("setStoreFuncUDFContextSignature", storeFunc.getLastMethodCalled());
+ wrapper.cleanupOnSuccess(null, null);
+ assertEquals("cleanupOnSuccess", storeFunc.getLastMethodCalled());
+
wrapper.cleanupOnFailure(null, null);
assertEquals("cleanupOnFailure", storeFunc.getLastMethodCalled());
Modified: pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java?rev=1382178&r1=1382177&r2=1382178&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java (original)
+++ pig/trunk/test/org/apache/pig/TestStoreFuncWrapper.java Fri Sep 7 21:21:08 2012
@@ -63,6 +63,9 @@ public class TestStoreFuncWrapper {
wrapper.setStoreFuncUDFContextSignature(null);
assertEquals("setStoreFuncUDFContextSignature", storeFunc.getLastMethodCalled());
+
+ wrapper.cleanupOnSuccess(null, null);
+ assertEquals("cleanupOnSuccess", storeFunc.getLastMethodCalled());
wrapper.cleanupOnFailure(null, null);
assertEquals("cleanupOnFailure", storeFunc.getLastMethodCalled());
@@ -150,6 +153,11 @@ public class TestStoreFuncWrapper {
}
@Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ setLastMethodCalled();
+ }
+
+ @Override
public void cleanupOnFailure(String location, Job job) throws IOException {
setLastMethodCalled();
}