You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/02/11 02:41:09 UTC
drill git commit: DRILL-2154: Add options to enable compression and
dictionary encoding in parquet writer
Repository: drill
Updated Branches:
refs/heads/master 3d863b5eb -> 00c08eff2
DRILL-2154: Add options to enable compression and dictionary encoding in parquet writer
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/00c08eff
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/00c08eff
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/00c08eff
Branch: refs/heads/master
Commit: 00c08eff2a5de6e4334fb57e952aac9a852c3d37
Parents: 3d863b5
Author: Steven Phillips <sp...@maprtech.com>
Authored: Fri Feb 6 13:00:36 2015 -0800
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Tue Feb 10 15:10:54 2015 -0800
----------------------------------------------------------------------
exec/java-exec/pom.xml | 2 +-
.../org/apache/drill/exec/ExecConstants.java | 7 ++++++
.../server/options/SystemOptionManager.java | 2 ++
.../exec/server/options/TypeValidators.java | 25 ++++++++++++++++++++
.../exec/store/parquet/ParquetFormatPlugin.java | 6 +++++
.../exec/store/parquet/ParquetRecordWriter.java | 20 ++++++++++++++++
.../physical/impl/writer/TestParquetWriter.java | 18 ++++++++++++++
7 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index f5d9962..06f60fb 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -131,7 +131,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
- <version>1.5.1-drill-r5</version>
+ <version>1.5.1-drill-r6</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f48b06d..5efcce8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.server.options.OptionValidator;
import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator;
+import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValidator;
import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
@@ -85,6 +86,12 @@ public interface ExecConstants {
public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
public static final String PARQUET_BLOCK_SIZE = "store.parquet.block-size";
public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
+ public static final String PARQUET_WRITER_COMPRESSION_TYPE = "store.parquet.compression";
+ public static final OptionValidator PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR = new EnumeratedStringValidator(
+ PARQUET_WRITER_COMPRESSION_TYPE, "snappy", "snappy", "gzip", "none");
+ public static final String PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING = "store.parquet.enable_dictionary_encoding";
+ public static final OptionValidator PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR = new BooleanValidator(
+ PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, true);
public static final String PARQUET_VECTOR_FILL_THRESHOLD = "store.parquet.vector_fill_threshold";
public static final OptionValidator PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_THRESHOLD, 99l, 85l);
http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index f20627d..aa0a5ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -57,6 +57,8 @@ public class SystemOptionManager implements OptionManager {
ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
+ ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR,
+ ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR,
ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR,
ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index 14be4b0..e53fcfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.server.options;
import java.math.BigDecimal;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.drill.common.exceptions.ExpressionParsingException;
import org.apache.drill.exec.server.options.OptionValue.Kind;
@@ -133,6 +135,29 @@ public class TypeValidators {
}
+ /**
+ * Validator that checks if the given value is included in a list of acceptable values. Case insensitive.
+ */
+ public static class EnumeratedStringValidator extends StringValidator {
+ Set<String> valuesSet = new HashSet<>();
+
+ public EnumeratedStringValidator(String name, String def, String... values) {
+ super(name, def);
+ for (String value : values) {
+ valuesSet.add(value.toLowerCase());
+ }
+ }
+
+ @Override
+ public void validate(OptionValue v) throws ExpressionParsingException {
+ super.validate(v);
+ if (!valuesSet.contains(v.string_val.toLowerCase())) {
+ throw new ExpressionParsingException(String.format("Option %s must be one of: %s", getOptionName(), valuesSet));
+ }
+ }
+
+ }
+
public static abstract class TypeValidator extends OptionValidator {
final Kind kind;
private OptionValue defaultValue;
http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index ab864db..e204a2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -132,6 +132,12 @@ public class ParquetFormatPlugin implements FormatPlugin{
options.put(ExecConstants.PARQUET_BLOCK_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
+ options.put(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
+ context.getOptions().getOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).string_val);
+
+ options.put(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
+ context.getOptions().getOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING).bool_val.toString());
+
RecordWriter recordWriter = new ParquetRecordWriter(context, writer);
recordWriter.init(options);
http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index cdb4ba0..8bf9a92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -108,6 +108,26 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
+ String codecName = writerOptions.get(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).toLowerCase();
+ switch(codecName) {
+ case "snappy":
+ codec = CompressionCodecName.SNAPPY;
+ break;
+ case "lzo":
+ codec = CompressionCodecName.LZO;
+ break;
+ case "gzip":
+ codec = CompressionCodecName.GZIP;
+ break;
+ case "none":
+ case "uncompressed":
+ codec = CompressionCodecName.UNCOMPRESSED;
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Unknown compression type: %s", codecName));
+ }
+
+ enableDictionary = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/00c08eff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index dc80afe..6aa3288 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -152,6 +152,24 @@ public class TestParquetWriter extends BaseTestQuery {
runTestAndValidate("*", "*", inputTable, "supplier_parquet");
}
+ @Test
+ public void testTPCHReadWriteNoDictUncompressed() throws Exception {
+ test(String.format("alter session set `%s` = false;", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+ test(String.format("alter session set `%s` = 'none'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+ String inputTable = "cp.`tpch/supplier.parquet`";
+ runTestAndValidate("*", "*", inputTable, "supplier_parquet_no_dict_uncompressed");
+ test(String.format("alter session set `%s` = true;", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+ test(String.format("alter session set `%s` = 'snappy'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+ }
+
+ @Test
+ public void testTPCHReadWriteDictGzip() throws Exception {
+ test(String.format("alter session set `%s` = 'gzip'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+ String inputTable = "cp.`tpch/supplier.parquet`";
+ runTestAndValidate("*", "*", inputTable, "supplier_parquet_dict_gzip");
+ test(String.format("alter session set `%s` = 'snappy'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+ }
+
// working to create an exhaustive test of the format for this one. including all convertedTypes
// will not be supporting interval for Beta as of current schedule
// Types left out: