You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/15 19:57:42 UTC

svn commit: r1632143 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/io/parquet/ java/org/apache/hadoop/hive/ql/io/parquet/write/ test/org/apache/hadoop/hive/ql/io/parquet/

Author: brock
Date: Wed Oct 15 17:57:41 2014
New Revision: 1632143

URL: http://svn.apache.org/r1632143
Log:
HIVE-7858 - Parquet compression should be configurable via table property (Ferdinand Xu via Brock)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1632143&r1=1632142&r2=1632143&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java Wed Oct 15 17:57:41 2014
@@ -21,6 +21,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -40,6 +41,8 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.util.ContextUtil;
 
 /**
  *
@@ -110,15 +113,19 @@ public class MapredParquetOutputFormat e
     }
 
     DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
-    return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
+
+    return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
+            progress,tableProperties);
   }
 
   protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
       ParquetOutputFormat<ArrayWritable> realOutputFormat,
       JobConf jobConf,
       String finalOutPath,
-      Progressable progress
+      Progressable progress,
+      Properties tableProperties
       ) throws IOException {
-    return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
+    return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
+            progress,tableProperties);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1632143&r1=1632142&r2=1632143&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Wed Oct 15 17:57:41 2014
@@ -14,6 +14,7 @@
 package org.apache.hadoop.hive.ql.io.parquet.write;
 
 import java.io.IOException;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.util.ContextUtil;
 
 public class ParquetRecordWriterWrapper implements RecordWriter<Void, ArrayWritable>,
@@ -43,7 +45,8 @@ public class ParquetRecordWriterWrapper 
       final OutputFormat<Void, ArrayWritable> realOutputFormat,
       final JobConf jobConf,
       final String name,
-      final Progressable progress) throws IOException {
+      final Progressable progress, Properties tableProperties) throws
+          IOException {
     try {
       // create a TaskInputOutputContext
       TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
@@ -53,7 +56,21 @@ public class ParquetRecordWriterWrapper 
       taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
 
       LOG.info("creating real writer to write at " + name);
-      realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
+
+      String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
+      if (compressionName != null && !compressionName.isEmpty()) {
+        //get override compression properties via "tblproperties" clause if it is set
+        LOG.debug("get override compression properties via tblproperties");
+
+        ContextUtil.getConfiguration(taskContext);
+        CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
+        realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf,
+                new Path(name), codecName);
+      } else {
+        realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext,
+                new Path(name));
+      }
+
       LOG.info("real writer: " + realWriter);
     } catch (final InterruptedException e) {
       throw new IOException(e);

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java?rev=1632143&r1=1632142&r2=1632143&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java Wed Oct 15 17:57:41 2014
@@ -73,7 +73,8 @@ public class TestMapredParquetOutputForm
             ParquetOutputFormat<ArrayWritable> realOutputFormat,
             JobConf jobConf,
             String finalOutPath,
-            Progressable progress
+            Progressable progress,
+            Properties tableProperties
             ) throws IOException {
           assertEquals(outputFormat, realOutputFormat);
           assertNotNull(jobConf.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA));
@@ -87,4 +88,17 @@ public class TestMapredParquetOutputForm
       assertEquals("passed tests", e.getMessage());
     }
   }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidCompressionTableProperties() throws IOException {
+    Properties tableProps = new Properties();
+    tableProps.setProperty("parquet.compression", "unsupported");
+    tableProps.setProperty("columns", "foo,bar");
+    tableProps.setProperty("columns.types", "int:int");
+
+    JobConf jobConf = new JobConf();
+
+    new MapredParquetOutputFormat().getHiveRecordWriter(jobConf,
+            new Path("/foo"), null, false, tableProps, null);
+  }
 }