You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/02/04 20:58:09 UTC
svn commit: r1564450 - in /pig/branches/branch-0.12: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
src/org/apache/pig/impl/io/SequenceFileInterStorage.java
src/org/apache/pig/impl/util/Utils.java
Author: aniket486
Date: Tue Feb 4 19:58:08 2014
New Revision: 1564450
URL: http://svn.apache.org/r1564450
Log:
PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage (aniket486)
Modified:
pig/branches/branch-0.12/CHANGES.txt
pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java
pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java
Modified: pig/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/CHANGES.txt (original)
+++ pig/branches/branch-0.12/CHANGES.txt Tue Feb 4 19:58:08 2014
@@ -32,6 +32,8 @@ PIG-3480: TFile-based tmpfile compressio
BUG FIXES
+PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage (aniket486)
+
PIG-3641: Split "otherwise" producing incorrect output when combined with ColumnPruning (knoguchi)
PIG-3677: ConfigurationUtil.getLocalFSProperties can return an inconsistent property set (rohini)
Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Feb 4 19:58:08 2014
@@ -576,6 +576,10 @@ public class JobControlCompiler{
LinkedList<POStore> mapStores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
LinkedList<POStore> reduceStores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
+ // tmp file compression setups
+ // PIG-3741 This must be done before setStoreLocation on POStores
+ Utils.setTmpFileCompressionOnConf(pigContext, conf);
+
for (POStore st: mapStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
@@ -816,9 +820,6 @@ public class JobControlCompiler{
conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
}
- // tmp file compression setups
- Utils.setTmpFileCompressionOnConf(pigContext, conf);
-
String tmp;
long maxCombinedSplitSize = 0;
if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))
Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java Tue Feb 4 19:58:08 2014
@@ -205,6 +205,8 @@ StoreFuncInterface, LoadMetadata {
@Override
public void setStoreLocation(String location, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ Utils.setMapredCompressionCodecProps(conf);
FileOutputFormat.setOutputPath(job, new Path(location));
}
Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java Tue Feb 4 19:58:08 2014
@@ -354,7 +354,20 @@ public class Utils {
}
}
+ public static void setMapredCompressionCodecProps(Configuration conf) {
+ String codec = conf.get(
+ PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "");
+ if ("".equals(codec) && conf.get("mapred.output.compression.codec") != null) {
+ conf.setBoolean("mapred.output.compress", true);
+ } else if(TEMPFILE_STORAGE.SEQFILE.ensureCodecSupported(codec)) {
+ conf.setBoolean("mapred.output.compress", true);
+ conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+ }
+ // no codec specified
+ }
+
public static void setTmpFileCompressionOnConf(PigContext pigContext, Configuration conf) throws IOException{
+ // PIG-3741 This is also called for non-intermediate jobs, do not set any mapred properties here
if (pigContext == null) {
return;
}
@@ -365,7 +378,6 @@ public class Utils {
case INTER:
break;
case SEQFILE:
- conf.setBoolean("mapred.output.compress", true);
conf.set(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, "seqfile");
if("".equals(codec)) {
// codec is not specified, ensure is set
@@ -374,7 +386,7 @@ public class Utils {
throw new IOException("mapred.output.compression.codec is not set");
}
} else if(storage.ensureCodecSupported(codec)) {
- conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+ // do nothing
} else {
throw new IOException("Invalid temporary file compression codec [" + codec + "]. " +
"Expected compression codecs for " + storage.getStorageClass().getName() + " are " + storage.supportedCodecsToString() + ".");