You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/09/17 20:13:52 UTC
svn commit: r1524152 - in /pig/trunk: CHANGES.txt conf/pig.properties
src/org/apache/pig/PigConfiguration.java
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
src/pig-default.properties
Author: daijy
Date: Tue Sep 17 18:13:52 2013
New Revision: 1524152
URL: http://svn.apache.org/r1524152
Log:
PIG-3065: pig output format/committer should support recovery for hadoop 0.23
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
pig/trunk/src/pig-default.properties
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 17 18:13:52 2013
@@ -30,6 +30,8 @@ PIG-3174: Remove rpm and deb artifacts f
IMPROVEMENTS
+PIG-3065: pig output format/committer should support recovery for hadoop 0.23 (daijy)
+
PIG-3390: Make pig working with HBase 0.95 (jarcec via daijy)
PIG-3431: Return more information for parsing related exceptions. (jeremykarn via daijy)
Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Tue Sep 17 18:13:52 2013
@@ -213,3 +213,6 @@ pig.location.check.strict=false
# pig.default.store.func=<fully qualified class name of a StoreFunc implementation>
# For eg, pig.default.store.func=org.apache.pig.custom.MyCustomStorage
+# This option is used to define whether to support recovery to handle the
+# application master getting restarted.
+# pig.output.committer.recovery.support=true
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Tue Sep 17 18:13:52 2013
@@ -93,4 +93,10 @@ public class PigConfiguration {
* as default in case this is undefined.
*/
public static final String PIG_DEFAULT_STORE_FUNC = "pig.default.store.func";
+
+ /**
+ * This key is used to define whether to support recovery to handle the
+ * application master getting restarted.
+ */
+ public static final String PIG_OUTPUT_COMMITTER_RECOVERY = "pig.output.committer.recovery.support";
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Sep 17 18:13:52 2013
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.PigConfiguration;
/**
* A specialization of the default FileOutputCommitter to allow
@@ -54,6 +55,8 @@ public class PigOutputCommitter extends
*/
List<Pair<OutputCommitter, POStore>> reduceOutputCommitters;
+ boolean recoverySupported;
+
/**
* @param context
* @param mapStores
@@ -66,7 +69,7 @@ public class PigOutputCommitter extends
// create and store the map and reduce output committers
mapOutputCommitters = getCommitters(context, mapStores);
reduceOutputCommitters = getCommitters(context, reduceStores);
-
+ recoverySupported = context.getConfiguration().getBoolean(PigConfiguration.PIG_OUTPUT_COMMITTER_RECOVERY, false);
}
/**
@@ -145,6 +148,82 @@ public class PigOutputCommitter extends
}
}
+ public boolean isRecoverySupported() {
+ if (!recoverySupported)
+ return false;
+ boolean allOutputCommitterSupportRecovery = true;
+ // call recoverTask on all map and reduce committers
+ for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
+ if (mapCommitter.first!=null) {
+ try {
+ // Use reflection, Hadoop 1.x line does not have such method
+ Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
+ && (Boolean)m.invoke(mapCommitter.first);
+ } catch (NoSuchMethodException e) {
+ allOutputCommitterSupportRecovery = false;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (!allOutputCommitterSupportRecovery)
+ return false;
+ }
+ }
+ for (Pair<OutputCommitter, POStore> reduceCommitter :
+ reduceOutputCommitters) {
+ if (reduceCommitter.first!=null) {
+ try {
+ // Use reflection, Hadoop 1.x line does not have such method
+ Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
+ && (Boolean)m.invoke(reduceCommitter.first);
+ } catch (NoSuchMethodException e) {
+ allOutputCommitterSupportRecovery = false;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (!allOutputCommitterSupportRecovery)
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void recoverTask(TaskAttemptContext context) throws IOException {
+ // call recoverTask on all map and reduce committers
+ for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
+ if (mapCommitter.first!=null) {
+ TaskAttemptContext updatedContext = setUpContext(context,
+ mapCommitter.second);
+ try {
+ // Use reflection, Hadoop 1.x line does not have such method
+ Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
+ m.invoke(mapCommitter.first, updatedContext);
+ } catch (NoSuchMethodException e) {
+ // We are using Hadoop 1.x, ignore
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ for (Pair<OutputCommitter, POStore> reduceCommitter :
+ reduceOutputCommitters) {
+ if (reduceCommitter.first!=null) {
+ TaskAttemptContext updatedContext = setUpContext(context,
+ reduceCommitter.second);
+ try {
+ // Use reflection, Hadoop 1.x line does not have such method
+ Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
+ m.invoke(reduceCommitter.first, updatedContext);
+ } catch (NoSuchMethodException e) {
+ // We are using Hadoop 1.x, ignore
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
@Override
public void cleanupJob(JobContext context) throws IOException {
// call clean up on all map and reduce committers
Modified: pig/trunk/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Tue Sep 17 18:13:52 2013
@@ -53,3 +53,5 @@ pig.optimistic.files.concatenation=false
pig.disable.counter=false
pig.sql.type=hcat
+
+pig.output.committer.recovery.support=false