You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/09/04 20:28:14 UTC

git commit: PARQUET-63: Enable dictionary encoding for FIXED.

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master f8b06df7a -> 647b8a70f


PARQUET-63: Enable dictionary encoding for FIXED.

This uses the existing dictionary support introduced for int96. Encoding
and ParquetProperties have been updated to use the dictionary supporting
classes, when requested for write or present during read. This also
fixes a bug in the fixed dictionary values writer, where the length was
hard-coded for int96, 12 bytes.

Author: Ryan Blue <rb...@cloudera.com>

Closes #30 from rdblue/PARQUET-63-add-fixed-dictionary-support and squashes the following commits:

bc34a34 [Ryan Blue] PARQUET-63: Enable dictionary encoding for FIXED.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/647b8a70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/647b8a70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/647b8a70

Branch: refs/heads/master
Commit: 647b8a70f9b7c94cabf9a7ec7bce2e7cbbb4c05b
Parents: f8b06df
Author: Ryan Blue <rb...@cloudera.com>
Authored: Thu Sep 4 11:28:03 2014 -0700
Committer: julien <ju...@twitter.com>
Committed: Thu Sep 4 11:28:03 2014 -0700

----------------------------------------------------------------------
 .../src/main/java/parquet/column/Encoding.java  |  3 ++
 .../java/parquet/column/ParquetProperties.java  |  6 +++-
 .../dictionary/DictionaryValuesWriter.java      |  2 +-
 .../example/data/simple/SimpleGroup.java        |  1 +
 .../parquet/io/ValidatingRecordConsumer.java    | 33 +++++++++++---------
 .../src/test/java/parquet/io/TestColumnIO.java  |  4 ++-
 6 files changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/647b8a70/parquet-column/src/main/java/parquet/column/Encoding.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/Encoding.java b/parquet-column/src/main/java/parquet/column/Encoding.java
index 890ec44..57af085 100644
--- a/parquet-column/src/main/java/parquet/column/Encoding.java
+++ b/parquet-column/src/main/java/parquet/column/Encoding.java
@@ -112,6 +112,7 @@ public enum Encoding {
     public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
       switch (descriptor.getType()) {
       case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
       case INT96:
       case INT64:
       case DOUBLE:
@@ -128,6 +129,8 @@ public enum Encoding {
       switch (descriptor.getType()) {
       case BINARY:
         return new PlainBinaryDictionary(dictionaryPage);
+      case FIXED_LEN_BYTE_ARRAY:
+        return new PlainBinaryDictionary(dictionaryPage, descriptor.getTypeLength());
       case INT96:
         return new PlainBinaryDictionary(dictionaryPage, 12);
       case INT64:

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/647b8a70/parquet-column/src/main/java/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
index 5c1b74c..26b900d 100644
--- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
@@ -119,7 +119,11 @@ public class ParquetProperties {
         return new PlainValuesWriter(initialSizePerCol);
       }
     case FIXED_LEN_BYTE_ARRAY:
-      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
+      if (enableDictionary && (writerVersion == WriterVersion.PARQUET_2_0)) {
+        return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, path.getTypeLength());
+      } else {
+        return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
+      }
     default:
       return new PlainValuesWriter(initialSizePerCol);
     }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/647b8a70/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java
index 0e76c47..4379360 100644
--- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java
+++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java
@@ -364,7 +364,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
     public DictionaryPage createDictionaryPage() {
       if (lastUsedDictionarySize > 0) {
         // return a dictionary only if we actually used it
-        FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(12, lastUsedDictionaryByteSize);
+        FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize);
         Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
         // write only the part of the dict that we used
         for (int i = 0; i < lastUsedDictionarySize; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/647b8a70/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java b/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java
index 8403a04..76c84e4 100644
--- a/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java
+++ b/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java
@@ -179,6 +179,7 @@ public class SimpleGroup extends Group {
   public void add(int fieldIndex, Binary value) {
     switch (getType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) {
       case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
         add(fieldIndex, new BinaryValue(value));
         break;
       case INT96:

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/647b8a70/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java
index 88d9502..5588512 100644
--- a/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java
+++ b/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java
@@ -27,6 +27,7 @@ import parquet.schema.Type;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type.Repetition;
 
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.*;
 
 /**
  * Wraps a record consumer
@@ -141,15 +142,15 @@ public class ValidatingRecordConsumer extends RecordConsumer {
         throw new InvalidRecordException("unknown repetition " + currentType.getRepetition() + " in " + currentType);
     }
     if (!currentType.isPrimitive() || currentType.asPrimitiveType().getPrimitiveTypeName() != p) {
-      throw new InvalidRecordException("expected type " + currentType + " but got "+ p);
+      throw new InvalidRecordException("expected type " + p + " but got "+ currentType);
     }
   }
 
-  private void validate(PrimitiveTypeName p1, PrimitiveTypeName p2) {
+  private void validate(PrimitiveTypeName... ptypes) {
     Type currentType = types.peek().asGroupType().getType(fields.peek());
     int c = fieldValueCount.pop() + 1;
     fieldValueCount.push(c);
-    if (DEBUG) LOG.debug("validate " + p1 + ", " + p2 + " for " + currentType.getName());
+    if (DEBUG) LOG.debug("validate " + Arrays.toString(ptypes) + " for " + currentType.getName());
     switch (currentType.getRepetition()) {
       case OPTIONAL:
       case REQUIRED:
@@ -162,19 +163,24 @@ public class ValidatingRecordConsumer extends RecordConsumer {
       default:
         throw new InvalidRecordException("unknown repetition " + currentType.getRepetition() + " in " + currentType);
     }
-    if (!currentType.isPrimitive() ||
-        (currentType.asPrimitiveType().getPrimitiveTypeName() != p1 &&
-         currentType.asPrimitiveType().getPrimitiveTypeName() != p2)) {
+    if (!currentType.isPrimitive()) {
       throw new InvalidRecordException(
-          "expected type " + currentType + " but got " + p1 + " or " + p2);
+          "expected type in " + Arrays.toString(ptypes) + " but got " + currentType);
     }
+    for (PrimitiveTypeName p : ptypes) {
+      if (currentType.asPrimitiveType().getPrimitiveTypeName() == p) {
+        return; // type is valid
+      }
+    }
+    throw new InvalidRecordException(
+        "expected type in " + Arrays.toString(ptypes) + " but got " + currentType);
   }
 
   /**
    * {@inheritDoc}
    */
   public void addInteger(int value) {
-    validate(PrimitiveTypeName.INT32);
+    validate(INT32);
     delegate.addInteger(value);
   }
 
@@ -182,7 +188,7 @@ public class ValidatingRecordConsumer extends RecordConsumer {
    * {@inheritDoc}
    */
   public void addLong(long value) {
-    validate(PrimitiveTypeName.INT64);
+    validate(INT64);
     delegate.addLong(value);
   }
 
@@ -190,7 +196,7 @@ public class ValidatingRecordConsumer extends RecordConsumer {
    * {@inheritDoc}
    */
   public void addBoolean(boolean value) {
-    validate(PrimitiveTypeName.BOOLEAN);
+    validate(BOOLEAN);
     delegate.addBoolean(value);
   }
 
@@ -198,8 +204,7 @@ public class ValidatingRecordConsumer extends RecordConsumer {
    * {@inheritDoc}
    */
   public void addBinary(Binary value) {
-    // TODO: this is used for FIXED also
-    validate(PrimitiveTypeName.BINARY, PrimitiveTypeName.INT96);
+    validate(BINARY, INT96, FIXED_LEN_BYTE_ARRAY);
     delegate.addBinary(value);
   }
 
@@ -207,7 +212,7 @@ public class ValidatingRecordConsumer extends RecordConsumer {
    * {@inheritDoc}
    */
   public void addFloat(float value) {
-    validate(PrimitiveTypeName.FLOAT);
+    validate(FLOAT);
     delegate.addFloat(value);
   }
 
@@ -215,7 +220,7 @@ public class ValidatingRecordConsumer extends RecordConsumer {
    * {@inheritDoc}
    */
   public void addDouble(double value) {
-    validate(PrimitiveTypeName.DOUBLE);
+    validate(DOUBLE);
     delegate.addDouble(value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/647b8a70/parquet-column/src/test/java/parquet/io/TestColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/parquet/io/TestColumnIO.java
index 743c97a..dddbcca 100644
--- a/parquet-column/src/test/java/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/parquet/io/TestColumnIO.java
@@ -80,6 +80,7 @@ public class TestColumnIO {
   + "  required boolean e;\n"
   + "  required binary f;\n"
   + "  required int96 g;\n"
+  + "  required fixed_len_byte_array(3) h;\n"
   + "}\n";
 
   private static final String schemaString =
@@ -363,7 +364,8 @@ public class TestColumnIO {
         .append("d", 4.0d)
         .append("e", true)
         .append("f", Binary.fromString("6"))
-        .append("g", new NanoTime(1234, System.currentTimeMillis() * 1000));
+        .append("g", new NanoTime(1234, System.currentTimeMillis() * 1000))
+        .append("h", Binary.fromString("abc"));
 
     testSchema(oneOfEachSchema, Arrays.asList(g1));
   }