You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/04/14 21:20:54 UTC
svn commit: r1587289 - in /pig/trunk: ./
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
src/org/apache/pig/backend/hadoop/executionengine/fetch/
Author: daijy
Date: Mon Apr 14 19:20:53 2014
New Revision: 1587289
URL: http://svn.apache.org/r1587289
Log:
PIG-3888: Direct fetch doesn't differentiate between frontend and backend sides
Modified:
pig/trunk/CHANGES.txt
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1587289&r1=1587288&r2=1587289&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 14 19:20:53 2014
@@ -105,6 +105,8 @@ PIG-3882: Multiquery off mode execution
BUG FIXES
+PIG-3888: Direct fetch doesn't differentiate between frontend and backend sides (lbendig via daijy)
+
PIG-3887: TestMRJobStats is broken in trunk (cheolsoo)
PIG-3868: Fix Iterator_1 e2e test on windows (ssvinarchukhorton via rohini)
Modified: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1587289&r1=1587288&r2=1587289&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Mon Apr 14 19:20:53 2014
@@ -119,4 +119,14 @@ public class HadoopShims {
public static void unsetConf(Configuration conf, String key) {
// Not supported in Hadoop 0.20/1.x
}
+
+ /**
+ * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ * @param conf
+ * @param taskAttemptID
+ */
+ public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
+ conf.set("mapred.task.id", taskAttemptID.toString());
+ }
+
}
Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1587289&r1=1587288&r2=1587289&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Mon Apr 14 19:20:53 2014
@@ -119,4 +119,14 @@ public class HadoopShims {
public static void unsetConf(Configuration conf, String key) {
conf.unset(key);
}
+
+ /**
+ * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ * @param conf
+ * @param taskAttemptID
+ */
+ public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
+ conf.setInt("mapreduce.job.application.attempt.id", taskAttemptID.getId());
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1587289&r1=1587288&r2=1587289&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Mon Apr 14 19:20:53 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -34,6 +35,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
@@ -70,17 +72,22 @@ public class FetchLauncher {
* @throws IOException
*/
public PigStats launchPig(PhysicalPlan pp) throws IOException {
- POStore poStore = (POStore) pp.getLeaves().get(0);
- init(pp, poStore);
-
- // run fetch
- runPipeline(poStore);
-
- UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
- new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
- udfFinisher.visit();
-
- return PigStats.start(new EmptyPigStats(pigContext, poStore));
+ try {
+ POStore poStore = (POStore) pp.getLeaves().get(0);
+ init(pp, poStore);
+
+ // run fetch
+ runPipeline(poStore);
+
+ UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
+ new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
+ udfFinisher.visit();
+
+ return PigStats.start(new EmptyPigStats(pigContext, poStore));
+ }
+ finally {
+ UDFContext.getUDFContext().addJobConf(null);
+ }
}
/**
@@ -112,6 +119,10 @@ public class FetchLauncher {
poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
poStore.setUp();
+
+ TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
+ HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+
if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
}