You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/10/13 01:05:27 UTC

parquet-mr git commit: PARQUET-743: Fix DictionaryFilter when compressed dictionaries are reused.

Repository: parquet-mr
Updated Branches:
  refs/heads/master de99127d7 -> 59ec4f018


PARQUET-743: Fix DictionaryFilter when compressed dictionaries are reused.

BytesInput is not supposed to be held and reused, but decompressed
dictionary pages do this. Reusing the dictionary will cause a failure,
so the cleanest option is to keep the bytes around once the underlying
stream has been read.

Author: Ryan Blue <bl...@apache.org>

Closes #376 from rdblue/PARQUET-743-fix-reused-dictionaries and squashes the following commits:

28c0903 [Ryan Blue] PARQUET-743: Fix DictionaryFilter when dictionaries are reused.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/59ec4f01
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/59ec4f01
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/59ec4f01

Branch: refs/heads/master
Commit: 59ec4f018963eb55e32fafc2b924826c39c09682
Parents: de99127
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Oct 12 18:05:21 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Oct 12 18:05:21 2016 -0700

----------------------------------------------------------------------
 .../parquet/hadoop/DictionaryPageReader.java     | 19 ++++++++++++++++++-
 .../dictionarylevel/DictionaryFilterTest.java    |  4 ++--
 2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/59ec4f01/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
index 9a99358..2be7ffe 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.hadoop;
 
 import org.apache.parquet.Strings;
+import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
@@ -93,7 +94,10 @@ class DictionaryPageReader implements DictionaryPageReadStore {
         // check the cache again in case this thread waited on another reading the same page
         if (!cache.containsKey(dotPath)) {
           DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null;
-          cache.put(dotPath, dict);
+          // copy the dictionary to ensure it can be reused if it is returned
+          // more than once. this can happen when a DictionaryFilter has two or
+          // more predicates for the same column.
+          cache.put(dotPath, reusableCopy(dict));
         }
       }
 
@@ -104,6 +108,19 @@ class DictionaryPageReader implements DictionaryPageReadStore {
     }
   }
 
+  private static DictionaryPage reusableCopy(DictionaryPage dict) {
+    if (dict == null) {
+      return null;
+    }
+    try {
+      return new DictionaryPage(
+          BytesInput.from(dict.getBytes().toByteArray()),
+          dict.getDictionarySize(), dict.getEncoding());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Cannot read dictionary", e);
+    }
+  }
+
   private boolean hasDictionaryPage(ColumnChunkMetaData column) {
     EncodingStats stats = column.getEncodingStats();
     if (stats != null) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/59ec4f01/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index 7af0c40..eca6332 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -56,7 +56,7 @@ import java.util.UUID;
 import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
 import static org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop;
 import static org.apache.parquet.filter2.predicate.FilterApi.*;
-import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -118,7 +118,7 @@ public class DictionaryFilterTest {
     SimpleGroupFactory f = new SimpleGroupFactory(schema);
     ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
         .withWriterVersion(PARQUET_1_0)
-        .withCompressionCodec(UNCOMPRESSED)
+        .withCompressionCodec(GZIP)
         .withRowGroupSize(1024*1024)
         .withPageSize(1024)
         .enableDictionaryEncoding()