You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/15 19:57:42 UTC
svn commit: r1632143 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/io/parquet/
java/org/apache/hadoop/hive/ql/io/parquet/write/
test/org/apache/hadoop/hive/ql/io/parquet/
Author: brock
Date: Wed Oct 15 17:57:41 2014
New Revision: 1632143
URL: http://svn.apache.org/r1632143
Log:
HIVE-7858 - Parquet compression should be configurable via table property (Ferdinand Xu via Brock)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1632143&r1=1632142&r2=1632143&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java Wed Oct 15 17:57:41 2014
@@ -21,6 +21,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -40,6 +41,8 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.util.Progressable;
import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.util.ContextUtil;
/**
*
@@ -110,15 +113,19 @@ public class MapredParquetOutputFormat e
}
DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
- return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
+
+ return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
+ progress,tableProperties);
}
protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
ParquetOutputFormat<ArrayWritable> realOutputFormat,
JobConf jobConf,
String finalOutPath,
- Progressable progress
+ Progressable progress,
+ Properties tableProperties
) throws IOException {
- return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
+ return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
+ progress,tableProperties);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1632143&r1=1632142&r2=1632143&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Wed Oct 15 17:57:41 2014
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.ql.io.parquet.write;
import java.io.IOException;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.util.Progressable;
import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.util.ContextUtil;
public class ParquetRecordWriterWrapper implements RecordWriter<Void, ArrayWritable>,
@@ -43,7 +45,8 @@ public class ParquetRecordWriterWrapper
final OutputFormat<Void, ArrayWritable> realOutputFormat,
final JobConf jobConf,
final String name,
- final Progressable progress) throws IOException {
+ final Progressable progress, Properties tableProperties) throws
+ IOException {
try {
// create a TaskInputOutputContext
TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
@@ -53,7 +56,21 @@ public class ParquetRecordWriterWrapper
taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
LOG.info("creating real writer to write at " + name);
- realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
+
+ String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
+ if (compressionName != null && !compressionName.isEmpty()) {
+ //get override compression properties via "tblproperties" clause if it is set
+ LOG.debug("get override compression properties via tblproperties");
+
+ ContextUtil.getConfiguration(taskContext);
+ CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
+ realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf,
+ new Path(name), codecName);
+ } else {
+ realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext,
+ new Path(name));
+ }
+
LOG.info("real writer: " + realWriter);
} catch (final InterruptedException e) {
throw new IOException(e);
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java?rev=1632143&r1=1632142&r2=1632143&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java Wed Oct 15 17:57:41 2014
@@ -73,7 +73,8 @@ public class TestMapredParquetOutputForm
ParquetOutputFormat<ArrayWritable> realOutputFormat,
JobConf jobConf,
String finalOutPath,
- Progressable progress
+ Progressable progress,
+ Properties tableProperties
) throws IOException {
assertEquals(outputFormat, realOutputFormat);
assertNotNull(jobConf.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA));
@@ -87,4 +88,17 @@ public class TestMapredParquetOutputForm
assertEquals("passed tests", e.getMessage());
}
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidCompressionTableProperties() throws IOException {
+ Properties tableProps = new Properties();
+ tableProps.setProperty("parquet.compression", "unsupported");
+ tableProps.setProperty("columns", "foo,bar");
+ tableProps.setProperty("columns.types", "int:int");
+
+ JobConf jobConf = new JobConf();
+
+ new MapredParquetOutputFormat().getHiveRecordWriter(jobConf,
+ new Path("/foo"), null, false, tableProps, null);
+ }
}