You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/09/17 20:13:52 UTC

svn commit: r1524152 - in /pig/trunk: CHANGES.txt conf/pig.properties src/org/apache/pig/PigConfiguration.java src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java src/pig-default.properties

Author: daijy
Date: Tue Sep 17 18:13:52 2013
New Revision: 1524152

URL: http://svn.apache.org/r1524152
Log:
PIG-3065: pig output format/committer should support recovery for hadoop 0.23

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    pig/trunk/src/pig-default.properties

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 17 18:13:52 2013
@@ -30,6 +30,8 @@ PIG-3174: Remove rpm and deb artifacts f
 
 IMPROVEMENTS
 
+PIG-3065: pig output format/committer should support recovery for hadoop 0.23 (daijy)
+
 PIG-3390: Make pig working with HBase 0.95 (jarcec via daijy)
 
 PIG-3431: Return more information for parsing related exceptions. (jeremykarn via daijy)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Tue Sep 17 18:13:52 2013
@@ -213,3 +213,6 @@ pig.location.check.strict=false
 # pig.default.store.func=<fully qualified class name of a StoreFunc implementation>
 # For eg, pig.default.store.func=org.apache.pig.custom.MyCustomStorage
 
+# This option is used to define whether to support recovery to handle the
+# application master getting restarted.
+# pig.output.committer.recovery.support=true

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Tue Sep 17 18:13:52 2013
@@ -93,4 +93,10 @@ public class PigConfiguration {
      * as default in case this is undefined.
      */
     public static final String PIG_DEFAULT_STORE_FUNC = "pig.default.store.func";
+
+    /**
+     * This key is used to define whether to support recovery to handle the
+     * application master getting restarted.
+     */
+    public static final String PIG_OUTPUT_COMMITTER_RECOVERY = "pig.output.committer.recovery.support";
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Sep 17 18:13:52 2013
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.PigConfiguration;
 
 /**
  * A specialization of the default FileOutputCommitter to allow
@@ -54,6 +55,8 @@ public class PigOutputCommitter extends 
      */
     List<Pair<OutputCommitter, POStore>> reduceOutputCommitters;
     
+    boolean recoverySupported;
+    
     /**
      * @param context
      * @param mapStores 
@@ -66,7 +69,7 @@ public class PigOutputCommitter extends 
         // create and store the map and reduce output committers
         mapOutputCommitters = getCommitters(context, mapStores);
         reduceOutputCommitters = getCommitters(context, reduceStores);
-        
+        recoverySupported = context.getConfiguration().getBoolean(PigConfiguration.PIG_OUTPUT_COMMITTER_RECOVERY, false);
     }
 
     /**
@@ -145,6 +148,82 @@ public class PigOutputCommitter extends 
         }
     }
 
+    public boolean isRecoverySupported() {
+        if (!recoverySupported)
+            return false;
+        boolean allOutputCommitterSupportRecovery = true;
+        // call recoverTask on all map and reduce committers
+        for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
+            if (mapCommitter.first!=null) {
+                try {
+                    // Use reflection, Hadoop 1.x line does not have such method
+                    Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
+                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
+                            && (Boolean)m.invoke(mapCommitter.first);
+                } catch (NoSuchMethodException e) {
+                    allOutputCommitterSupportRecovery = false;
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                if (!allOutputCommitterSupportRecovery)
+                    return false;
+            }
+        }
+        for (Pair<OutputCommitter, POStore> reduceCommitter :
+            reduceOutputCommitters) {
+            if (reduceCommitter.first!=null) {
+                try {
+                    // Use reflection, Hadoop 1.x line does not have such method
+                    Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
+                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
+                            && (Boolean)m.invoke(reduceCommitter.first);
+                } catch (NoSuchMethodException e) {
+                    allOutputCommitterSupportRecovery = false;
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                if (!allOutputCommitterSupportRecovery)
+                    return false;
+            }
+        }
+        return true;
+    }
+
+    public void recoverTask(TaskAttemptContext context) throws IOException {
+        // call recoverTask on all map and reduce committers
+        for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
+            if (mapCommitter.first!=null) {
+                TaskAttemptContext updatedContext = setUpContext(context,
+                        mapCommitter.second);
+                try {
+                    // Use reflection, Hadoop 1.x line does not have such method
+                    Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
+                    m.invoke(mapCommitter.first, updatedContext);
+                } catch (NoSuchMethodException e) {
+                    // We are using Hadoop 1.x, ignore
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+        }
+        for (Pair<OutputCommitter, POStore> reduceCommitter :
+            reduceOutputCommitters) {
+            if (reduceCommitter.first!=null) {
+                TaskAttemptContext updatedContext = setUpContext(context,
+                        reduceCommitter.second);
+                try {
+                    // Use reflection, Hadoop 1.x line does not have such method
+                    Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
+                    m.invoke(reduceCommitter.first, updatedContext);
+                } catch (NoSuchMethodException e) {
+                    // We are using Hadoop 1.x, ignore
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+        }
+    }
+    
     @Override
     public void cleanupJob(JobContext context) throws IOException {
         // call clean up on all map and reduce committers

Modified: pig/trunk/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1524152&r1=1524151&r2=1524152&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Tue Sep 17 18:13:52 2013
@@ -53,3 +53,5 @@ pig.optimistic.files.concatenation=false
 pig.disable.counter=false
 
 pig.sql.type=hcat
+
+pig.output.committer.recovery.support=false