You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2021/06/22 07:52:51 UTC
[parquet-mr] branch master updated: PARQUET-2054: fix TCP leaking
when calling ParquetFileWriter.appendFile (#913)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new bab3d53 PARQUET-2054: fix TCP leaking when calling ParquetFileWriter.appendFile (#913)
bab3d53 is described below
commit bab3d53bff84a74743b2f62f5e394cbd9410b31f
Author: Kai Jiang <ji...@gmail.com>
AuthorDate: Tue Jun 22 00:52:43 2021 -0700
PARQUET-2054: fix TCP leaking when calling ParquetFileWriter.appendFile (#913)
* use try-with-resource statement for ParquetFileReader to call close explicitly
---
.../cli/commands/CheckParquet251Command.java | 6 +-
.../apache/parquet/cli/commands/SchemaCommand.java | 7 +-
.../cli/commands/ShowDictionaryCommand.java | 91 +++++------
.../parquet/cli/commands/ShowPagesCommand.java | 90 +++++------
.../apache/parquet/hadoop/ParquetFileWriter.java | 4 +-
.../parquet/hadoop/TestParquetFileWriter.java | 167 +++++++++++----------
.../apache/parquet/hadoop/TestParquetWriter.java | 17 ++-
.../parquet/hadoop/TestReadWriteEncodingStats.java | 49 +++---
8 files changed, 221 insertions(+), 210 deletions(-)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
index fbeebdf..d7aa82d 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
@@ -108,10 +108,8 @@ public class CheckParquet251Command extends BaseCommand {
}));
// now check to see if the data is actually corrupt
- ParquetFileReader reader = new ParquetFileReader(getConf(),
- fakeMeta, path, footer.getBlocks(), columns);
-
- try {
+ try (ParquetFileReader reader = new ParquetFileReader(getConf(),
+ fakeMeta, path, footer.getBlocks(), columns)) {
PageStatsValidator validator = new PageStatsValidator();
for (PageReadStore pages = reader.readNextRowGroup(); pages != null;
pages = reader.readNextRowGroup()) {
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
index ca29dd0..988ab0f 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
@@ -119,9 +119,10 @@ public class SchemaCommand extends BaseCommand {
switch (format) {
case PARQUET:
- return new ParquetFileReader(
- getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)
- .getFileMetaData().getSchema().toString();
+ try (ParquetFileReader reader = new ParquetFileReader(
+ getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)) {
+ return reader.getFileMetaData().getSchema().toString();
+ }
default:
throw new IllegalArgumentException(String.format(
"Could not get a Parquet schema for format %s: %s", format, source));
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
index 20a694f..7a167ed 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
@@ -64,56 +64,57 @@ public class ShowDictionaryCommand extends BaseCommand {
String source = targets.get(0);
- ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
- MessageType schema = reader.getFileMetaData().getSchema();
- ColumnDescriptor descriptor = Util.descriptor(column, schema);
- PrimitiveType type = Util.primitive(column, schema);
- Preconditions.checkNotNull(type);
-
- DictionaryPageReadStore dictionaryReader;
- int rowGroup = 0;
- while ((dictionaryReader = reader.getNextDictionaryReader()) != null) {
- DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor);
-
- Dictionary dict = page.getEncoding().initDictionary(descriptor, page);
-
- console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize());
- for (int i = 0; i <= dict.getMaxId(); i += 1) {
- switch(type.getPrimitiveTypeName()) {
- case BINARY:
- if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+ try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
+ MessageType schema = reader.getFileMetaData().getSchema();
+ ColumnDescriptor descriptor = Util.descriptor(column, schema);
+ PrimitiveType type = Util.primitive(column, schema);
+ Preconditions.checkNotNull(type);
+
+ DictionaryPageReadStore dictionaryReader;
+ int rowGroup = 0;
+ while ((dictionaryReader = reader.getNextDictionaryReader()) != null) {
+ DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor);
+
+ Dictionary dict = page.getEncoding().initDictionary(descriptor, page);
+
+ console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize());
+ for (int i = 0; i <= dict.getMaxId(); i += 1) {
+ switch(type.getPrimitiveTypeName()) {
+ case BINARY:
+ if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+ console.info("{}: {}", String.format("%6d", i),
+ Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
+ } else {
+ console.info("{}: {}", String.format("%6d", i),
+ Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70));
+ }
+ break;
+ case INT32:
console.info("{}: {}", String.format("%6d", i),
- Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
- } else {
+ dict.decodeToInt(i));
+ break;
+ case INT64:
console.info("{}: {}", String.format("%6d", i),
- Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70));
- }
- break;
- case INT32:
- console.info("{}: {}", String.format("%6d", i),
- dict.decodeToInt(i));
- break;
- case INT64:
- console.info("{}: {}", String.format("%6d", i),
- dict.decodeToLong(i));
- break;
- case FLOAT:
- console.info("{}: {}", String.format("%6d", i),
- dict.decodeToFloat(i));
- break;
- case DOUBLE:
- console.info("{}: {}", String.format("%6d", i),
- dict.decodeToDouble(i));
- break;
- default:
- throw new IllegalArgumentException(
- "Unknown dictionary type: " + type.getPrimitiveTypeName());
+ dict.decodeToLong(i));
+ break;
+ case FLOAT:
+ console.info("{}: {}", String.format("%6d", i),
+ dict.decodeToFloat(i));
+ break;
+ case DOUBLE:
+ console.info("{}: {}", String.format("%6d", i),
+ dict.decodeToDouble(i));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown dictionary type: " + type.getPrimitiveTypeName());
+ }
}
- }
- reader.skipNextRowGroup();
+ reader.skipNextRowGroup();
- rowGroup += 1;
+ rowGroup += 1;
+ }
}
console.info("");
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
index 5832106..bf030ac 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
@@ -75,57 +75,57 @@ public class ShowPagesCommand extends BaseCommand {
"Cannot process multiple Parquet files.");
String source = targets.get(0);
- ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
-
- MessageType schema = reader.getFileMetaData().getSchema();
- Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
- if (this.columns == null || this.columns.isEmpty()) {
- for (ColumnDescriptor descriptor : schema.getColumns()) {
- columns.put(descriptor, primitive(schema, descriptor.getPath()));
- }
- } else {
- for (String column : this.columns) {
- columns.put(descriptor(column, schema), primitive(column, schema));
- }
- }
-
- CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
- // accumulate formatted lines to print by column
- Map<String, List<String>> formatted = Maps.newLinkedHashMap();
- PageFormatter formatter = new PageFormatter();
- PageReadStore pageStore;
- int rowGroupNum = 0;
- while ((pageStore = reader.readNextRowGroup()) != null) {
- for (ColumnDescriptor descriptor : columns.keySet()) {
- List<String> lines = formatted.get(columnName(descriptor));
- if (lines == null) {
- lines = Lists.newArrayList();
- formatted.put(columnName(descriptor), lines);
+ try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
+ MessageType schema = reader.getFileMetaData().getSchema();
+ Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
+ if (this.columns == null || this.columns.isEmpty()) {
+ for (ColumnDescriptor descriptor : schema.getColumns()) {
+ columns.put(descriptor, primitive(schema, descriptor.getPath()));
}
-
- formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
- PageReader pages = pageStore.getPageReader(descriptor);
-
- DictionaryPage dict = pages.readDictionaryPage();
- if (dict != null) {
- lines.add(formatter.format(dict));
+ } else {
+ for (String column : this.columns) {
+ columns.put(descriptor(column, schema), primitive(column, schema));
}
- DataPage page;
- while ((page = pages.readPage()) != null) {
- lines.add(formatter.format(page));
+ }
+
+ CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
+ // accumulate formatted lines to print by column
+ Map<String, List<String>> formatted = Maps.newLinkedHashMap();
+ PageFormatter formatter = new PageFormatter();
+ PageReadStore pageStore;
+ int rowGroupNum = 0;
+ while ((pageStore = reader.readNextRowGroup()) != null) {
+ for (ColumnDescriptor descriptor : columns.keySet()) {
+ List<String> lines = formatted.get(columnName(descriptor));
+ if (lines == null) {
+ lines = Lists.newArrayList();
+ formatted.put(columnName(descriptor), lines);
+ }
+
+ formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
+ PageReader pages = pageStore.getPageReader(descriptor);
+
+ DictionaryPage dict = pages.readDictionaryPage();
+ if (dict != null) {
+ lines.add(formatter.format(dict));
+ }
+ DataPage page;
+ while ((page = pages.readPage()) != null) {
+ lines.add(formatter.format(page));
+ }
}
+ rowGroupNum += 1;
}
- rowGroupNum += 1;
- }
- // TODO: Show total column size and overall size per value in the column summary line
- for (String columnName : formatted.keySet()) {
- console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-')));
- console.info(formatter.getHeader());
- for (String line : formatted.get(columnName)) {
- console.info(line);
+ // TODO: Show total column size and overall size per value in the column summary line
+ for (String columnName : formatted.keySet()) {
+ console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-')));
+ console.info(formatter.getHeader());
+ for (String line : formatted.get(columnName)) {
+ console.info(line);
+ }
+ console.info("");
}
- console.info("");
}
return 0;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index a246a52..afbdf76 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -887,7 +887,9 @@ public class ParquetFileWriter {
*/
@Deprecated
public void appendFile(Configuration conf, Path file) throws IOException {
- ParquetFileReader.open(conf, file).appendTo(this);
+ try (ParquetFileReader reader = ParquetFileReader.open(conf, file)) {
+ reader.appendTo(this);
+ }
}
public void appendFile(InputFile file) throws IOException {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 73ef70e..5b8c5ed 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -204,35 +204,37 @@ public class TestParquetFileWriter {
assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings());
{ // read first block of col #1
- ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
- Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
- PageReadStore pages = r.readNextRowGroup();
- assertEquals(3, pages.getRowCount());
- validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
- assertNull(r.readNextRowGroup());
+ try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) {
+ PageReadStore pages = r.readNextRowGroup();
+ assertEquals(3, pages.getRowCount());
+ validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+ assertNull(r.readNextRowGroup());
+ }
}
{ // read all blocks of col #1 and #2
- ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
- readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+ try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) {
- PageReadStore pages = r.readNextRowGroup();
- assertEquals(3, pages.getRowCount());
- validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
- validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
- validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
+ PageReadStore pages = r.readNextRowGroup();
+ assertEquals(3, pages.getRowCount());
+ validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+ validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+ validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
- pages = r.readNextRowGroup();
- assertEquals(4, pages.getRowCount());
+ pages = r.readNextRowGroup();
+ assertEquals(4, pages.getRowCount());
- validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
- validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
+ validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+ validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
- assertNull(r.readNextRowGroup());
+ assertNull(r.readNextRowGroup());
+ }
}
PrintFooter.main(new String[] {path.toString()});
}
@@ -281,12 +283,14 @@ public class TestParquetFileWriter {
w.endBlock();
w.end(new HashMap<>());
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
- ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
- Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
- BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
- BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
- assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))));
- assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world"))));
+
+ try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)))) {
+ BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
+ BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
+ assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))));
+ assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world"))));
+ }
}
@Test
@@ -340,16 +344,16 @@ public class TestParquetFileWriter {
expectedEncoding.add(PLAIN);
assertEquals(expectedEncoding, readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
- ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
- readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
-
- PageReadStore pages = reader.readNextRowGroup();
- assertEquals(14, pages.getRowCount());
- validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12);
- validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(),data.toByteArray(), 12);
- validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
- validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
- assertNull(reader.readNextRowGroup());
+ try (ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) {
+ PageReadStore pages = reader.readNextRowGroup();
+ assertEquals(14, pages.getRowCount());
+ validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12);
+ validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12);
+ validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
+ validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
+ assertNull(reader.readNextRowGroup());
+ }
}
@Test
@@ -426,35 +430,37 @@ public class TestParquetFileWriter {
120, readFooter.getBlocks().get(1).getStartingPos());
{ // read first block of col #1
- ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
- Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
- PageReadStore pages = r.readNextRowGroup();
- assertEquals(3, pages.getRowCount());
- validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
- assertNull(r.readNextRowGroup());
+ try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+ Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) {
+ PageReadStore pages = r.readNextRowGroup();
+ assertEquals(3, pages.getRowCount());
+ validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+ assertNull(r.readNextRowGroup());
+ }
}
{ // read all blocks of col #1 and #2
- ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
- readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+ try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+ readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) {
- PageReadStore pages = r.readNextRowGroup();
- assertEquals(3, pages.getRowCount());
- validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
- validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
- validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
+ PageReadStore pages = r.readNextRowGroup();
+ assertEquals(3, pages.getRowCount());
+ validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+ validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+ validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
- pages = r.readNextRowGroup();
- assertEquals(4, pages.getRowCount());
+ pages = r.readNextRowGroup();
+ assertEquals(4, pages.getRowCount());
- validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
- validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
+ validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+ validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
- assertNull(r.readNextRowGroup());
+ assertNull(r.readNextRowGroup());
+ }
}
PrintFooter.main(new String[] {path.toString()});
}
@@ -533,35 +539,36 @@ public class TestParquetFileWriter {
109, readFooter.getBlocks().get(1).getStartingPos());
{ // read first block of col #1
- ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
- Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
- PageReadStore pages = r.readNextRowGroup();
- assertEquals(3, pages.getRowCount());
- validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
- assertNull(r.readNextRowGroup());
+ try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+ Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) {
+ PageReadStore pages = r.readNextRowGroup();
+ assertEquals(3, pages.getRowCount());
+ validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+ assertNull(r.readNextRowGroup());
+ }
}
{ // read all blocks of col #1 and #2
- ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
- readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+ try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+ readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) {
+ PageReadStore pages = r.readNextRowGroup();
+ assertEquals(3, pages.getRowCount());
+ validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+ validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+ validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+ validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
- PageReadStore pages = r.readNextRowGroup();
- assertEquals(3, pages.getRowCount());
- validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
- validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
- validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
- validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
+ pages = r.readNextRowGroup();
+ assertEquals(4, pages.getRowCount());
- pages = r.readNextRowGroup();
- assertEquals(4, pages.getRowCount());
+ validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+ validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
- validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
- validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
-
- assertNull(r.readNextRowGroup());
+ assertNull(r.readNextRowGroup());
+ }
}
PrintFooter.main(new String[] {path.toString()});
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 9e9b735..b2ae72a 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -270,14 +270,15 @@ public class TestParquetWriter {
}
}
- ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()));
- BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
- BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData)
- .readBloomFilter(blockMetaData.getColumns().get(0));
-
- for (String name: testNames) {
- assertTrue(bloomFilter.findHash(
- LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
+ try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
+ BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
+ BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData)
+ .readBloomFilter(blockMetaData.getColumns().get(0));
+
+ for (String name : testNames) {
+ assertTrue(bloomFilter.findHash(
+ LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
+ }
}
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java
index 69e11c1..fdb7c86 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java
@@ -93,29 +93,30 @@ public class TestReadWriteEncodingStats {
writeData(writer);
writer.close();
- ParquetFileReader reader = ParquetFileReader.open(CONF, path);
- assertEquals("Should have one row group", 1, reader.getRowGroups().size());
- BlockMetaData rowGroup = reader.getRowGroups().get(0);
-
- ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0);
- EncodingStats dictStats = dictColumn.getEncodingStats();
- assertNotNull("Dict column should have non-null encoding stats", dictStats);
- assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages());
- assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages());
- assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages());
-
- ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1);
- EncodingStats plainStats = plainColumn.getEncodingStats();
- assertNotNull("Plain column should have non-null encoding stats", plainStats);
- assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages());
- assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages());
- assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages());
-
- ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2);
- EncodingStats fallbackStats = fallbackColumn.getEncodingStats();
- assertNotNull("Fallback column should have non-null encoding stats", fallbackStats);
- assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages());
- assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages());
- assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages());
+ try (ParquetFileReader reader = ParquetFileReader.open(CONF, path)) {
+ assertEquals("Should have one row group", 1, reader.getRowGroups().size());
+ BlockMetaData rowGroup = reader.getRowGroups().get(0);
+
+ ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0);
+ EncodingStats dictStats = dictColumn.getEncodingStats();
+ assertNotNull("Dict column should have non-null encoding stats", dictStats);
+ assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages());
+ assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages());
+ assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages());
+
+ ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1);
+ EncodingStats plainStats = plainColumn.getEncodingStats();
+ assertNotNull("Plain column should have non-null encoding stats", plainStats);
+ assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages());
+ assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages());
+ assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages());
+
+ ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2);
+ EncodingStats fallbackStats = fallbackColumn.getEncodingStats();
+ assertNotNull("Fallback column should have non-null encoding stats", fallbackStats);
+ assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages());
+ assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages());
+ assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages());
+ }
}
}