You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/14 21:02:46 UTC

svn commit: r1639777 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/io/parquet/serde/ java/org/apache/hadoop/hive/ql/io/parquet/write/ test/queries/clientpositive/ test/results/clientpositive/

Author: brock
Date: Fri Nov 14 20:02:45 2014
New Revision: 1639777

URL: http://svn.apache.org/r1639777
Log:
HIVE-8823 - Add additional serde properties for parquet (Ferdinand Xu via Brock)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
    hive/trunk/ql/src/test/queries/clientpositive/create_like.q
    hive/trunk/ql/src/test/results/clientpositive/create_like.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1639777&r1=1639776&r2=1639777&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Fri Nov 14 20:02:45 2014
@@ -64,6 +64,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.ParquetWriter;
 import parquet.io.api.Binary;
 
 /**
@@ -79,8 +80,9 @@ public class ParquetHiveSerDe extends Ab
   public static final Text MAP = new Text("map");
   public static final Text ARRAY = new Text("bag");
 
-  // default compression type is uncompressed
-  private static final String DEFAULTCOMPRESSION = "UNCOMPRESSED";
+  // default compression type for parquet output format
+  private static final String DEFAULTCOMPRESSION =
+          ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME.name();
 
   // Map precision to the number bytes needed for binary conversion.
   public static final int PRECISION_TO_BYTE_COUNT[] = new int[38];

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1639777&r1=1639776&r2=1639777&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Fri Nov 14 20:02:45 2014
@@ -18,10 +18,12 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -55,21 +57,13 @@ public class ParquetRecordWriterWrapper 
       }
       taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
 
+      LOG.info("initialize serde with table properties.");
+      initializeSerProperties(taskContext, tableProperties);
+
       LOG.info("creating real writer to write at " + name);
 
-      String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
-      if (compressionName != null && !compressionName.isEmpty()) {
-        //get override compression properties via "tblproperties" clause if it is set
-        LOG.debug("get override compression properties via tblproperties");
-
-        ContextUtil.getConfiguration(taskContext);
-        CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
-        realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf,
-                new Path(name), codecName);
-      } else {
-        realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext,
-                new Path(name));
-      }
+      realWriter =
+              ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
 
       LOG.info("real writer: " + realWriter);
     } catch (final InterruptedException e) {
@@ -77,6 +71,31 @@ public class ParquetRecordWriterWrapper 
     }
   }
 
+  private void initializeSerProperties(JobContext job, Properties tableProperties) {
+    String blockSize = tableProperties.getProperty(ParquetOutputFormat.BLOCK_SIZE);
+    Configuration conf = ContextUtil.getConfiguration(job);
+    if (blockSize != null && !blockSize.isEmpty()) {
+      LOG.debug("get override parquet.block.size property via tblproperties");
+      conf.setInt(ParquetOutputFormat.BLOCK_SIZE, Integer.valueOf(blockSize));
+    }
+
+    String enableDictionaryPage =
+      tableProperties.getProperty(ParquetOutputFormat.ENABLE_DICTIONARY);
+    if (enableDictionaryPage != null && !enableDictionaryPage.isEmpty()) {
+      LOG.debug("get override parquet.enable.dictionary property via tblproperties");
+      conf.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY,
+        Boolean.valueOf(enableDictionaryPage));
+    }
+
+    String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
+    if (compressionName != null && !compressionName.isEmpty()) {
+      //get override compression properties via "tblproperties" clause if it is set
+      LOG.debug("get override compression properties via tblproperties");
+      CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
+      conf.set(ParquetOutputFormat.COMPRESSION, codecName.name());
+    }
+  }
+
   @Override
   public void close(final Reporter reporter) throws IOException {
     try {

Modified: hive/trunk/ql/src/test/queries/clientpositive/create_like.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/create_like.q?rev=1639777&r1=1639776&r2=1639777&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/create_like.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/create_like.q Fri Nov 14 20:02:45 2014
@@ -65,8 +65,8 @@ DESCRIBE FORMATTED doctors;
 CREATE TABLE doctors2 like doctors;
 DESCRIBE FORMATTED doctors2;
 
-CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO");
-CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable;
+CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO");
+CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable;
 
-DESCRIBE FORMATTED LikeCompressedParquetTable;
+DESCRIBE FORMATTED LikePropertiedParquetTable;
 

Modified: hive/trunk/ql/src/test/results/clientpositive/create_like.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/create_like.q.out?rev=1639777&r1=1639776&r2=1639777&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/create_like.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/create_like.q.out Fri Nov 14 20:02:45 2014
@@ -405,28 +405,28 @@ Bucket Columns:     	[]                 
 Sort Columns:       	[]                  	 
 Storage Desc Params:	 	 
 	serialization.format	1                   
-PREHOOK: query: CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO")
+PREHOOK: query: CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO")
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
-PREHOOK: Output: default@CompressedParquetTable
-POSTHOOK: query: CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO")
+PREHOOK: Output: default@PropertiedParquetTable
+POSTHOOK: query: CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO")
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
-POSTHOOK: Output: default@CompressedParquetTable
-PREHOOK: query: CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable
+POSTHOOK: Output: default@PropertiedParquetTable
+PREHOOK: query: CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
-PREHOOK: Output: default@LikeCompressedParquetTable
-POSTHOOK: query: CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable
+PREHOOK: Output: default@LikePropertiedParquetTable
+POSTHOOK: query: CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
-POSTHOOK: Output: default@LikeCompressedParquetTable
-PREHOOK: query: DESCRIBE FORMATTED LikeCompressedParquetTable
+POSTHOOK: Output: default@LikePropertiedParquetTable
+PREHOOK: query: DESCRIBE FORMATTED LikePropertiedParquetTable
 PREHOOK: type: DESCTABLE
-PREHOOK: Input: default@likecompressedparquettable
-POSTHOOK: query: DESCRIBE FORMATTED LikeCompressedParquetTable
+PREHOOK: Input: default@likepropertiedparquettable
+POSTHOOK: query: DESCRIBE FORMATTED LikePropertiedParquetTable
 POSTHOOK: type: DESCTABLE
-POSTHOOK: Input: default@likecompressedparquettable
+POSTHOOK: Input: default@likepropertiedparquettable
 # col_name            	data_type           	comment             
 	 	 
 a                   	int