You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/10/13 01:05:27 UTC
parquet-mr git commit: PARQUET-743: Fix DictionaryFilter when
compressed dictionaries are reused.
Repository: parquet-mr
Updated Branches:
refs/heads/master de99127d7 -> 59ec4f018
PARQUET-743: Fix DictionaryFilter when compressed dictionaries are reused.
BytesInput is not supposed to be held and reused, but decompressed
dictionary pages do this. Reusing the dictionary will cause a failure,
so the cleanest option is to keep the bytes around once the underlying
stream has been read.
Author: Ryan Blue <bl...@apache.org>
Closes #376 from rdblue/PARQUET-743-fix-reused-dictionaries and squashes the following commits:
28c0903 [Ryan Blue] PARQUET-743: Fix DictionaryFilter when dictionaries are reused.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/59ec4f01
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/59ec4f01
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/59ec4f01
Branch: refs/heads/master
Commit: 59ec4f018963eb55e32fafc2b924826c39c09682
Parents: de99127
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Oct 12 18:05:21 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Oct 12 18:05:21 2016 -0700
----------------------------------------------------------------------
.../parquet/hadoop/DictionaryPageReader.java | 19 ++++++++++++++++++-
.../dictionarylevel/DictionaryFilterTest.java | 4 ++--
2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/59ec4f01/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
index 9a99358..2be7ffe 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -19,6 +19,7 @@
package org.apache.parquet.hadoop;
import org.apache.parquet.Strings;
+import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
@@ -93,7 +94,10 @@ class DictionaryPageReader implements DictionaryPageReadStore {
// check the cache again in case this thread waited on another reading the same page
if (!cache.containsKey(dotPath)) {
DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null;
- cache.put(dotPath, dict);
+ // copy the dictionary to ensure it can be reused if it is returned
+ // more than once. this can happen when a DictionaryFilter has two or
+ // more predicates for the same column.
+ cache.put(dotPath, reusableCopy(dict));
}
}
@@ -104,6 +108,19 @@ class DictionaryPageReader implements DictionaryPageReadStore {
}
}
+ private static DictionaryPage reusableCopy(DictionaryPage dict) {
+ if (dict == null) {
+ return null;
+ }
+ try {
+ return new DictionaryPage(
+ BytesInput.from(dict.getBytes().toByteArray()),
+ dict.getDictionarySize(), dict.getEncoding());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Cannot read dictionary", e);
+ }
+ }
+
private boolean hasDictionaryPage(ColumnChunkMetaData column) {
EncodingStats stats = column.getEncodingStats();
if (stats != null) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/59ec4f01/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index 7af0c40..eca6332 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -56,7 +56,7 @@ import java.util.UUID;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
import static org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop;
import static org.apache.parquet.filter2.predicate.FilterApi.*;
-import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -118,7 +118,7 @@ public class DictionaryFilterTest {
SimpleGroupFactory f = new SimpleGroupFactory(schema);
ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
.withWriterVersion(PARQUET_1_0)
- .withCompressionCodec(UNCOMPRESSED)
+ .withCompressionCodec(GZIP)
.withRowGroupSize(1024*1024)
.withPageSize(1024)
.enableDictionaryEncoding()