You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/04/14 21:20:54 UTC

svn commit: r1587289 - in /pig/trunk: ./ shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/ shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/ src/org/apache/pig/backend/hadoop/executionengine/fetch/

Author: daijy
Date: Mon Apr 14 19:20:53 2014
New Revision: 1587289

URL: http://svn.apache.org/r1587289
Log:
PIG-3888: Direct fetch doesn't differentiate between frontend and backend sides

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1587289&r1=1587288&r2=1587289&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 14 19:20:53 2014
@@ -105,6 +105,8 @@ PIG-3882: Multiquery off mode execution 
  
 BUG FIXES
 
+PIG-3888: Direct fetch doesn't differentiate between frontend and backend sides (lbendig via daijy)
+
 PIG-3887: TestMRJobStats is broken in trunk (cheolsoo)
 
 PIG-3868: Fix Iterator_1 e2e test on windows (ssvinarchukhorton via rohini)

Modified: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1587289&r1=1587288&r2=1587289&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Mon Apr 14 19:20:53 2014
@@ -119,4 +119,14 @@ public class HadoopShims {
     public static void unsetConf(Configuration conf, String key) {
         // Not supported in Hadoop 0.20/1.x
     }
+    
+    /**
+     * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop 
+     * @param conf
+     * @param taskAttemptID
+     */
+    public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
+        conf.set("mapred.task.id", taskAttemptID.toString());
+    }
+    
 }

Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1587289&r1=1587288&r2=1587289&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Mon Apr 14 19:20:53 2014
@@ -119,4 +119,14 @@ public class HadoopShims {
     public static void unsetConf(Configuration conf, String key) {
         conf.unset(key);
     }
+    
+    /**
+     * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop 
+     * @param conf
+     * @param taskAttemptID
+     */
+    public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
+        conf.setInt("mapreduce.job.application.attempt.id", taskAttemptID.getId());
+    }
+    
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1587289&r1=1587288&r2=1587289&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Mon Apr 14 19:20:53 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -34,6 +35,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.impl.PigContext;
@@ -70,17 +72,22 @@ public class FetchLauncher {
      * @throws IOException
      */
     public PigStats launchPig(PhysicalPlan pp) throws IOException {
-        POStore poStore = (POStore) pp.getLeaves().get(0);
-        init(pp, poStore);
-
-        // run fetch
-        runPipeline(poStore);
-
-        UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
-                new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
-        udfFinisher.visit();
-
-        return PigStats.start(new EmptyPigStats(pigContext, poStore));
+        try {
+            POStore poStore = (POStore) pp.getLeaves().get(0);
+            init(pp, poStore);
+
+            // run fetch
+            runPipeline(poStore);
+            
+            UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
+                    new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
+            udfFinisher.visit();
+            
+            return PigStats.start(new EmptyPigStats(pigContext, poStore));
+        }
+        finally {
+            UDFContext.getUDFContext().addJobConf(null);
+        }
     }
 
     /**
@@ -112,6 +119,10 @@ public class FetchLauncher {
 
         poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
         poStore.setUp();
+
+        TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
+        HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+        
         if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
             MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
         }