You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/02/11 02:41:09 UTC

drill git commit: DRILL-2154: Add options to enable compression and dictionary encoding in parquet writer

Repository: drill
Updated Branches:
  refs/heads/master 3d863b5eb -> 00c08eff2


DRILL-2154: Add options to enable compression and dictionary encoding in parquet writer


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/00c08eff
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/00c08eff
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/00c08eff

Branch: refs/heads/master
Commit: 00c08eff2a5de6e4334fb57e952aac9a852c3d37
Parents: 3d863b5
Author: Steven Phillips <sp...@maprtech.com>
Authored: Fri Feb 6 13:00:36 2015 -0800
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Tue Feb 10 15:10:54 2015 -0800

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |  2 +-
 .../org/apache/drill/exec/ExecConstants.java    |  7 ++++++
 .../server/options/SystemOptionManager.java     |  2 ++
 .../exec/server/options/TypeValidators.java     | 25 ++++++++++++++++++++
 .../exec/store/parquet/ParquetFormatPlugin.java |  6 +++++
 .../exec/store/parquet/ParquetRecordWriter.java | 20 ++++++++++++++++
 .../physical/impl/writer/TestParquetWriter.java | 18 ++++++++++++++
 7 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index f5d9962..06f60fb 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -131,7 +131,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.5.1-drill-r5</version>
+      <version>1.5.1-drill-r6</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f48b06d..5efcce8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.server.options.OptionValidator;
 import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
 import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator;
+import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValidator;
 import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
@@ -85,6 +86,12 @@ public interface ExecConstants {
   public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
   public static final String PARQUET_BLOCK_SIZE = "store.parquet.block-size";
   public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
+  public static final String PARQUET_WRITER_COMPRESSION_TYPE = "store.parquet.compression";
+  public static final OptionValidator PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR = new EnumeratedStringValidator(
+      PARQUET_WRITER_COMPRESSION_TYPE, "snappy", "snappy", "gzip", "none");
+  public static final String PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING = "store.parquet.enable_dictionary_encoding";
+  public static final OptionValidator PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR = new BooleanValidator(
+      PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, true);
 
   public static final String PARQUET_VECTOR_FILL_THRESHOLD = "store.parquet.vector_fill_threshold";
   public static final OptionValidator PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_THRESHOLD, 99l, 85l);

http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index f20627d..aa0a5ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -57,6 +57,8 @@ public class SystemOptionManager implements OptionManager {
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
+      ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR,
+      ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR,
       ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index 14be4b0..e53fcfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.server.options;
 
 import java.math.BigDecimal;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.drill.common.exceptions.ExpressionParsingException;
 import org.apache.drill.exec.server.options.OptionValue.Kind;
@@ -133,6 +135,29 @@ public class TypeValidators {
 
   }
 
+  /**
+   * Validator that checks if the given value is included in a list of acceptable values. Case insensitive.
+   */
+  public static class EnumeratedStringValidator extends StringValidator {
+    Set<String> valuesSet = new HashSet<>();
+
+    public EnumeratedStringValidator(String name, String def, String... values) {
+      super(name, def);
+      for (String value : values) {
+        valuesSet.add(value.toLowerCase());
+      }
+    }
+
+    @Override
+    public void validate(OptionValue v) throws ExpressionParsingException {
+      super.validate(v);
+      if (!valuesSet.contains(v.string_val.toLowerCase())) {
+        throw new ExpressionParsingException(String.format("Option %s must be one of: %s", getOptionName(), valuesSet));
+      }
+    }
+
+  }
+
   public static abstract class TypeValidator extends OptionValidator {
     final Kind kind;
     private OptionValue defaultValue;

http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index ab864db..e204a2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -132,6 +132,12 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
     options.put(ExecConstants.PARQUET_BLOCK_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
 
+    options.put(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
+        context.getOptions().getOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).string_val);
+
+    options.put(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
+        context.getOptions().getOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING).bool_val.toString());
+
     RecordWriter recordWriter = new ParquetRecordWriter(context, writer);
     recordWriter.init(options);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index cdb4ba0..8bf9a92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -108,6 +108,26 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     conf = new Configuration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
     blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
+    String codecName = writerOptions.get(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).toLowerCase();
+    switch(codecName) {
+    case "snappy":
+      codec = CompressionCodecName.SNAPPY;
+      break;
+    case "lzo":
+      codec = CompressionCodecName.LZO;
+      break;
+    case "gzip":
+      codec = CompressionCodecName.GZIP;
+      break;
+    case "none":
+    case "uncompressed":
+      codec = CompressionCodecName.UNCOMPRESSED;
+      break;
+    default:
+      throw new UnsupportedOperationException(String.format("Unknown compression type: %s", codecName));
+    }
+
+    enableDictionary = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index dc80afe..6aa3288 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -152,6 +152,24 @@ public class TestParquetWriter extends BaseTestQuery {
     runTestAndValidate("*", "*", inputTable, "supplier_parquet");
   }
 
+  @Test
+  public void testTPCHReadWriteNoDictUncompressed() throws Exception {
+    test(String.format("alter session set `%s` = false;", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+    test(String.format("alter session set `%s` = 'none'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+    String inputTable = "cp.`tpch/supplier.parquet`";
+    runTestAndValidate("*", "*", inputTable, "supplier_parquet_no_dict_uncompressed");
+    test(String.format("alter session set `%s` = true;", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+    test(String.format("alter session set `%s` = 'snappy'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+  }
+
+  @Test
+  public void testTPCHReadWriteDictGzip() throws Exception {
+    test(String.format("alter session set `%s` = 'gzip'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+    String inputTable = "cp.`tpch/supplier.parquet`";
+    runTestAndValidate("*", "*", inputTable, "supplier_parquet_dict_gzip");
+    test(String.format("alter session set `%s` = 'snappy'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+  }
+
   // working to create an exhaustive test of the format for this one. including all convertedTypes
   // will not be supporting interval for Beta as of current schedule
   // Types left out: