You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/02/04 20:58:09 UTC

svn commit: r1564450 - in /pig/branches/branch-0.12: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java src/org/apache/pig/impl/io/SequenceFileInterStorage.java src/org/apache/pig/impl/util/Utils.java

Author: aniket486
Date: Tue Feb  4 19:58:08 2014
New Revision: 1564450

URL: http://svn.apache.org/r1564450
Log:
PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage (aniket486)

Modified:
    pig/branches/branch-0.12/CHANGES.txt
    pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java
    pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java

Modified: pig/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/CHANGES.txt (original)
+++ pig/branches/branch-0.12/CHANGES.txt Tue Feb  4 19:58:08 2014
@@ -32,6 +32,8 @@ PIG-3480: TFile-based tmpfile compressio
 
 BUG FIXES
 
+PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage (aniket486)
+
 PIG-3641: Split "otherwise" producing incorrect output when combined with ColumnPruning (knoguchi)
 
 PIG-3677: ConfigurationUtil.getLocalFSProperties can return an inconsistent property set (rohini)

Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Feb  4 19:58:08 2014
@@ -576,6 +576,10 @@ public class JobControlCompiler{
             LinkedList<POStore> mapStores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
             LinkedList<POStore> reduceStores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
 
+            // tmp file compression setups
+            // PIG-3741 This must be done before setStoreLocation on POStores
+            Utils.setTmpFileCompressionOnConf(pigContext, conf);
+
             for (POStore st: mapStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
@@ -816,9 +820,6 @@ public class JobControlCompiler{
                 conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
             }
 
-            // tmp file compression setups
-            Utils.setTmpFileCompressionOnConf(pigContext, conf);
-
             String tmp;
             long maxCombinedSplitSize = 0;
             if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))

Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java Tue Feb  4 19:58:08 2014
@@ -205,6 +205,8 @@ StoreFuncInterface, LoadMetadata {
 
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        Utils.setMapredCompressionCodecProps(conf);
         FileOutputFormat.setOutputPath(job, new Path(location));
     }
 

Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java Tue Feb  4 19:58:08 2014
@@ -354,7 +354,20 @@ public class Utils {
         }
     }
 
+    public static void setMapredCompressionCodecProps(Configuration conf) {
+        String codec = conf.get(
+                PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "");
+        if ("".equals(codec) && conf.get("mapred.output.compression.codec") != null) {
+            conf.setBoolean("mapred.output.compress", true);
+        } else if(TEMPFILE_STORAGE.SEQFILE.ensureCodecSupported(codec)) {
+            conf.setBoolean("mapred.output.compress", true);
+            conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+        }
+        // no codec specified
+    }
+
     public static void setTmpFileCompressionOnConf(PigContext pigContext, Configuration conf) throws IOException{
+        // PIG-3741 This is also called for non-intermediate jobs, do not set any mapred properties here
         if (pigContext == null) {
             return;
         }
@@ -365,7 +378,6 @@ public class Utils {
         case INTER:
             break;
         case SEQFILE:
-            conf.setBoolean("mapred.output.compress", true);
             conf.set(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, "seqfile");
             if("".equals(codec)) {
                 // codec is not specified, ensure  is set
@@ -374,7 +386,7 @@ public class Utils {
                     throw new IOException("mapred.output.compression.codec is not set");
                 }
             } else if(storage.ensureCodecSupported(codec)) {
-                conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+                // do nothing
             } else {
                 throw new IOException("Invalid temporary file compression codec [" + codec + "]. " +
                         "Expected compression codecs for " + storage.getStorageClass().getName() + " are " + storage.supportedCodecsToString() + ".");