You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/18 19:31:55 UTC
svn commit: r805506 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/plan/
Author: zshao
Date: Tue Aug 18 17:31:54 2009
New Revision: 805506
URL: http://svn.apache.org/viewvc?rev=805506&view=rev
Log:
HIVE-759. Add hive.intermediate.compression.codec/type option. (Yongqiang He via zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Aug 18 17:31:54 2009
@@ -23,6 +23,9 @@
HIVE-760. Add version info to META-INF/MANIFEST.MF.
(Bill Graham via zshao)
+ HIVE-759. Add "hive.intermediate.compression.codec/type" option.
+ (Yongqiang He via zshao)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Aug 18 17:31:54 2009
@@ -63,6 +63,8 @@
SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000),
COMPRESSRESULT("hive.exec.compress.output", false),
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
+ COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
+ COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""),
BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long)(1000*1000*1000)),
MAXREDUCERS("hive.exec.reducers.max", 999),
PREEXECHOOKS("hive.exec.pre.hooks", ""),
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Aug 18 17:31:54 2009
@@ -36,7 +36,12 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.LzoCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
/**
* File Sink operator implementation
@@ -113,8 +118,21 @@
Path parent = Utilities.toTempPath(specPath);
finalPath = HiveFileFormatUtils.getOutputFormatFinalPath(parent, jc, hiveOutputFormat, isCompressed, finalPath);
tableDesc tableInfo = conf.getTableInfo();
-
- outWriter = getRecordWriter(jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath);
+ JobConf jc_output = jc;
+ if (isCompressed) {
+ jc_output = new JobConf(jc);
+ String codecStr = conf.getCompressCodec();
+ if (codecStr != null && !codecStr.trim().equals("")) {
+ Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(codecStr);
+ FileOutputFormat.setOutputCompressorClass(jc_output, codec);
+ }
+ String type = conf.getCompressType();
+ if(type !=null && !type.trim().equals("")) {
+ CompressionType style = CompressionType.valueOf(type);
+ SequenceFileOutputFormat.setOutputCompressionType(jc, style);
+ }
+ }
+ outWriter = getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath);
// in recent hadoop versions, use deleteOnExit to clean tmp files.
autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPath);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Aug 18 17:31:54 2009
@@ -627,11 +627,13 @@
PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
// Create a file sink operator for this file name
- Operator<? extends Serializable> fs_op =
- putOpInsertMap(OperatorFactory.get
- (new fileSinkDesc(taskTmpDir, tt_desc,
- parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE)),
- parent.getSchema()), null, parseCtx);
+ boolean compressIntermediate = parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE);
+ fileSinkDesc desc = new fileSinkDesc(taskTmpDir, tt_desc, compressIntermediate);
+ if (compressIntermediate) {
+ desc.setCompressCodec(parseCtx.getConf().getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
+ desc.setCompressType(parseCtx.getConf().getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
+ }
+ Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory.get(desc, parent.getSchema()), null, parseCtx);
// replace the reduce child with this operator
List<Operator<? extends Serializable>> childOpList = parent.getChildOperators();
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java?rev=805506&r1=805505&r2=805506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java Tue Aug 18 17:31:54 2009
@@ -27,6 +27,8 @@
private tableDesc tableInfo;
private boolean compressed;
private int destTableId;
+ private String compressCodec;
+ private String compressType;
public fileSinkDesc() { }
@@ -85,4 +87,16 @@
public void setDestTableId(int destTableId) {
this.destTableId = destTableId;
}
+ public String getCompressCodec() {
+ return compressCodec;
+ }
+ public void setCompressCodec(String intermediateCompressorCodec) {
+ this.compressCodec = intermediateCompressorCodec;
+ }
+ public String getCompressType() {
+ return compressType;
+ }
+ public void setCompressType(String intermediateCompressType) {
+ this.compressType = intermediateCompressType;
+ }
}