You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/03/10 23:46:49 UTC
svn commit: r1576117 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/impl/util/
Author: aniket486
Date: Mon Mar 10 22:46:49 2014
New Revision: 1576117
URL: http://svn.apache.org/r1576117
Log:
PIG-3801: Auto local mode does not call storeSchema (aniket486)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Mar 10 22:46:49 2014
@@ -99,6 +99,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3801: Auto local mode does not call storeSchema (aniket486)
+
PIG-3754: InputSizeReducerEstimator.getTotalInputFileSize reports incorrect size (aniket486)
PIG-3679: e2e StreamingPythonUDFs_10 fails in trunk (cheolsoo)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Mar 10 22:46:49 2014
@@ -621,7 +621,7 @@ public class JobControlCompiler{
}
}
- if(isLocal(pigContext, conf)) {
+ if(Utils.isLocal(pigContext, conf)) {
ConfigurationUtil.replaceConfigForLocalMode(conf);
}
conf.set("pig.inputs", ObjectSerializer.serialize(inp));
@@ -1529,10 +1529,6 @@ public class JobControlCompiler{
}
}
- private static boolean isLocal(PigContext pigContext, Configuration conf) {
- return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
- }
-
private static String addSingleFileToDistributedCache(
PigContext pigContext, Configuration conf, String filename,
String prefix) throws IOException {
@@ -1547,7 +1543,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added by Hadoop team.
- if (!isLocal(pigContext, conf)) {
+ if (!Utils.isLocal(pigContext, conf)) {
symlink = prefix + "_"
+ Integer.toString(System.identityHashCode(filename)) + "_"
+ Long.toString(System.currentTimeMillis());
@@ -1721,7 +1717,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (isLocal(pigContext, conf)) return;
+ if (Utils.isLocal(pigContext, conf)) return;
// set up distributed cache for the replicated files
FileSpec[] replFiles = join.getReplFiles();
@@ -1777,7 +1773,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (isLocal(pigContext, conf)) return;
+ if (Utils.isLocal(pigContext, conf)) return;
String indexFile = join.getIndexFile();
@@ -1801,7 +1797,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (isLocal(pigContext, conf)) return;
+ if (Utils.isLocal(pigContext, conf)) return;
String indexFile = mergeCoGrp.getIndexFileName();
@@ -1838,7 +1834,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (isLocal(pigContext, conf)) return;
+ if (Utils.isLocal(pigContext, conf)) return;
// set up distributed cache for files indicated by the UDF
String[] files = func.getCacheFiles();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Mar 10 22:46:49 2014
@@ -41,7 +41,6 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigRunner.ReturnCode;
@@ -477,7 +476,7 @@ public class MapReduceLauncher extends L
for (Job job : succJobs) {
List<POStore> sts = jcc.getStores(job);
for (POStore st : sts) {
- if (pc.getExecType() == ExecType.LOCAL) {
+ if (Utils.isLocal(pc, job.getJobConf())) {
HadoopShims.storeSchemaForLocal(job, st);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Mon Mar 10 22:46:49 2014
@@ -48,6 +48,7 @@ import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
public class PigInputFormat extends InputFormat<Text, Tuple> {
@@ -226,7 +227,7 @@ public class PigInputFormat extends Inpu
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
- if(!pigContext.getExecType().isLocal()) {
+ if(!Utils.isLocal(pigContext, conf)) {
fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
}
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Mar 10 22:46:49 2014
@@ -55,6 +55,7 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.io.SequenceFileInterStorage;
@@ -542,5 +543,11 @@ public class Utils {
}
return true;
}
+
+
+
+ public static boolean isLocal(PigContext pigContext, Configuration conf) {
+ return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
+ }
}