You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/18 19:31:55 UTC

svn commit: r805506 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/plan/

Author: zshao
Date: Tue Aug 18 17:31:54 2009
New Revision: 805506

URL: http://svn.apache.org/viewvc?rev=805506&view=rev
Log:
HIVE-759. Add hive.intermediate.compression.codec/type option. (Yongqiang He via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Aug 18 17:31:54 2009
@@ -23,6 +23,9 @@
     HIVE-760. Add version info to META-INF/MANIFEST.MF.
     (Bill Graham via zshao)
 
+    HIVE-759. Add "hive.intermediate.compression.codec/type" option.
+    (Yongqiang He via zshao)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Aug 18 17:31:54 2009
@@ -63,6 +63,8 @@
     SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000),
     COMPRESSRESULT("hive.exec.compress.output", false),
     COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
+    COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
+    COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""),
     BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long)(1000*1000*1000)),
     MAXREDUCERS("hive.exec.reducers.max", 999),
     PREEXECHOOKS("hive.exec.pre.hooks", ""),

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Aug 18 17:31:54 2009
@@ -36,7 +36,12 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.LzoCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 
 /**
  * File Sink operator implementation
@@ -113,8 +118,21 @@
       Path parent = Utilities.toTempPath(specPath);
       finalPath = HiveFileFormatUtils.getOutputFormatFinalPath(parent, jc, hiveOutputFormat, isCompressed, finalPath);
       tableDesc tableInfo = conf.getTableInfo();
-
-      outWriter = getRecordWriter(jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath);
+      JobConf jc_output = jc;
+      if (isCompressed) {
+        jc_output = new JobConf(jc);
+        String codecStr = conf.getCompressCodec();
+        if (codecStr != null && !codecStr.trim().equals("")) {
+          Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(codecStr);
+          FileOutputFormat.setOutputCompressorClass(jc_output, codec);
+        }
+        String type = conf.getCompressType();
+        if(type !=null && !type.trim().equals("")) {
+          CompressionType style = CompressionType.valueOf(type);
+          SequenceFileOutputFormat.setOutputCompressionType(jc, style);
+        }
+      }
+      outWriter = getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath);
 
       // in recent hadoop versions, use deleteOnExit to clean tmp files.
       autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPath);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Aug 18 17:31:54 2009
@@ -627,11 +627,13 @@
       PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); 
     
     // Create a file sink operator for this file name
-    Operator<? extends Serializable> fs_op =
-      putOpInsertMap(OperatorFactory.get
-                     (new fileSinkDesc(taskTmpDir, tt_desc,
-                                       parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE)),
-                      parent.getSchema()), null, parseCtx);
+    boolean compressIntermediate = parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE);
+    fileSinkDesc desc = new fileSinkDesc(taskTmpDir, tt_desc, compressIntermediate);
+    if (compressIntermediate) {
+      desc.setCompressCodec(parseCtx.getConf().getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
+      desc.setCompressType(parseCtx.getConf().getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
+    }
+    Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory.get(desc, parent.getSchema()), null, parseCtx);
     
     // replace the reduce child with this operator
     List<Operator<? extends Serializable>> childOpList = parent.getChildOperators();

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java Tue Aug 18 17:31:54 2009
@@ -27,6 +27,8 @@
   private tableDesc tableInfo;
   private boolean compressed;
   private int destTableId;
+  private String compressCodec;
+  private String compressType;
 
 
   public fileSinkDesc() { }
@@ -85,4 +87,16 @@
   public void setDestTableId(int destTableId) {
     this.destTableId = destTableId;
   }
+  public String getCompressCodec() {
+    return compressCodec;
+  }
+  public void setCompressCodec(String intermediateCompressorCodec) {
+    this.compressCodec = intermediateCompressorCodec;
+  }
+  public String getCompressType() {
+    return compressType;
+  }
+  public void setCompressType(String intermediateCompressType) {
+    this.compressType = intermediateCompressType;
+  }
 }