You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/03/10 23:46:49 UTC

svn commit: r1576117 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/util/

Author: aniket486
Date: Mon Mar 10 22:46:49 2014
New Revision: 1576117

URL: http://svn.apache.org/r1576117
Log:
PIG-3801: Auto local mode does not call storeSchema (aniket486)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Mar 10 22:46:49 2014
@@ -99,6 +99,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3801: Auto local mode does not call storeSchema (aniket486)
+
 PIG-3754: InputSizeReducerEstimator.getTotalInputFileSize reports incorrect size (aniket486)
 
 PIG-3679: e2e StreamingPythonUDFs_10 fails in trunk (cheolsoo)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Mar 10 22:46:49 2014
@@ -621,7 +621,7 @@ public class JobControlCompiler{
                 }
             }
 
-            if(isLocal(pigContext, conf)) {
+            if(Utils.isLocal(pigContext, conf)) {
                 ConfigurationUtil.replaceConfigForLocalMode(conf);
             }
             conf.set("pig.inputs", ObjectSerializer.serialize(inp));
@@ -1529,10 +1529,6 @@ public class JobControlCompiler{
         }
     }
 
-    private static boolean isLocal(PigContext pigContext, Configuration conf) {
-        return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
-    }
-
     private static String addSingleFileToDistributedCache(
             PigContext pigContext, Configuration conf, String filename,
             String prefix) throws IOException {
@@ -1547,7 +1543,7 @@ public class JobControlCompiler{
 
         // XXX Hadoop currently doesn't support distributed cache in local mode.
         // This line will be removed after the support is added by Hadoop team.
-        if (!isLocal(pigContext, conf)) {
+        if (!Utils.isLocal(pigContext, conf)) {
             symlink = prefix + "_"
                     + Integer.toString(System.identityHashCode(filename)) + "_"
                     + Long.toString(System.currentTimeMillis());
@@ -1721,7 +1717,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (isLocal(pigContext, conf)) return;
+            if (Utils.isLocal(pigContext, conf)) return;
 
             // set up distributed cache for the replicated files
             FileSpec[] replFiles = join.getReplFiles();
@@ -1777,7 +1773,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (isLocal(pigContext, conf)) return;
+            if (Utils.isLocal(pigContext, conf)) return;
 
             String indexFile = join.getIndexFile();
 
@@ -1801,7 +1797,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (isLocal(pigContext, conf)) return;
+            if (Utils.isLocal(pigContext, conf)) return;
 
             String indexFile = mergeCoGrp.getIndexFileName();
 
@@ -1838,7 +1834,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (isLocal(pigContext, conf)) return;
+            if (Utils.isLocal(pigContext, conf)) return;
 
             // set up distributed cache for files indicated by the UDF
             String[] files = func.getCacheFiles();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Mar 10 22:46:49 2014
@@ -41,7 +41,6 @@ import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigRunner.ReturnCode;
@@ -477,7 +476,7 @@ public class MapReduceLauncher extends L
             for (Job job : succJobs) {
                 List<POStore> sts = jcc.getStores(job);
                 for (POStore st : sts) {
-                    if (pc.getExecType() == ExecType.LOCAL) {
+                    if (Utils.isLocal(pc, job.getJobConf())) {
                         HadoopShims.storeSchemaForLocal(job, st);
                     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Mon Mar 10 22:46:49 2014
@@ -48,6 +48,7 @@ import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
 
 public class PigInputFormat extends InputFormat<Text, Tuple> {
 
@@ -226,7 +227,7 @@ public class PigInputFormat extends Inpu
 
                 // if the execution is against Mapred DFS, set
                 // working dir to /user/<userid>
-                if(!pigContext.getExecType().isLocal()) {
+                if(!Utils.isLocal(pigContext, conf)) {
                     fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
                 }
 

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1576117&r1=1576116&r2=1576117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Mar 10 22:46:49 2014
@@ -55,6 +55,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.io.SequenceFileInterStorage;
@@ -542,5 +543,11 @@ public class Utils {
         }
         return true;
     }
+    
+
+
+    public static boolean isLocal(PigContext pigContext, Configuration conf) {
+        return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
+    }
 
 }