You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2018/08/03 05:39:31 UTC
[parquet-mr] branch column-indexes updated: PARQUET-1364: Invalid
row indexes for pages starting with nulls (#507)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/column-indexes by this push:
new 43ac3e1 PARQUET-1364: Invalid row indexes for pages starting with nulls (#507)
43ac3e1 is described below
commit 43ac3e18aeb10a421657480da9d9bd2a862fdd3c
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Fri Aug 3 07:39:28 2018 +0200
PARQUET-1364: Invalid row indexes for pages starting with nulls (#507)
---
.../apache/parquet/column/impl/ColumnWriteStoreBase.java | 4 ++--
.../org/apache/parquet/column/impl/ColumnWriterBase.java | 15 ++++++++-------
.../apache/parquet/column/impl/TestColumnReaderImpl.java | 8 ++++----
.../java/org/apache/parquet/column/mem/TestMemColumn.java | 5 ++++-
4 files changed, 18 insertions(+), 14 deletions(-)
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index d04192f..5cd7d87 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -149,7 +149,7 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
for (ColumnWriterBase memColumn : columns.values()) {
long rows = rowCount - memColumn.getRowsWrittenSoFar();
if (rows > 0) {
- memColumn.writePage(rowCount);
+ memColumn.writePage();
}
memColumn.finalizeColumnChunk();
}
@@ -195,7 +195,7 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
long rows = rowCount - writer.getRowsWrittenSoFar();
long remainingMem = props.getPageSizeThreshold() - usedMem;
if (remainingMem <= thresholdTolerance) {
- writer.writePage(rowCount);
+ writer.writePage();
remainingMem = props.getPageSizeThreshold();
}
long rowsToFillPage =
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 16085bb..3788c82 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -20,7 +20,6 @@ package org.apache.parquet.column.impl;
import java.io.IOException;
-import org.apache.parquet.Ints;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
@@ -52,6 +51,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
private Statistics<?> statistics;
private long rowsWrittenSoFar = 0;
+ private int pageRowCount;
ColumnWriterBase(
ColumnDescriptor path,
@@ -84,6 +84,10 @@ abstract class ColumnWriterBase implements ColumnWriter {
private void repetitionLevel(int repetitionLevel) {
repetitionLevelColumn.writeInteger(repetitionLevel);
+ assert pageRowCount == 0 ? repetitionLevel == 0 : true : "Every page shall start on record boundaries";
+ if (repetitionLevel == 0) {
+ ++pageRowCount;
+ }
}
/**
@@ -299,13 +303,9 @@ abstract class ColumnWriterBase implements ColumnWriter {
/**
* Writes the current data to a new page in the page store
- *
- * @param rowCount
- * how many rows have been written so far
*/
- void writePage(long rowCount) {
- int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
- this.rowsWrittenSoFar = rowCount;
+ void writePage() {
+ this.rowsWrittenSoFar += pageRowCount;
if (DEBUG)
LOG.debug("write page");
try {
@@ -318,6 +318,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
dataColumn.reset();
valueCount = 0;
resetStatistics();
+ pageRowCount = 0;
}
abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
index d2d78c4..35fddaf 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
@@ -65,10 +65,10 @@ public class TestColumnReaderImpl {
for (int i = 0; i < rows; i++) {
columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
if ((i + 1) % 1000 == 0) {
- columnWriterV2.writePage(i);
+ columnWriterV2.writePage();
}
}
- columnWriterV2.writePage(rows);
+ columnWriterV2.writePage();
columnWriterV2.finalizeColumnChunk();
List<DataPage> pages = pageWriter.getPages();
int valueCount = 0;
@@ -103,10 +103,10 @@ public class TestColumnReaderImpl {
for (int i = 0; i < rows; i++) {
columnWriterV2.writeNull(0, 0);
if ((i + 1) % 1000 == 0) {
- columnWriterV2.writePage(i);
+ columnWriterV2.writePage();
}
}
- columnWriterV2.writePage(rows);
+ columnWriterV2.writePage();
columnWriterV2.finalizeColumnChunk();
List<DataPage> pages = pageWriter.getPages();
int valueCount = 0;
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index c28649e..e5db38c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -138,13 +138,16 @@ public class TestMemColumn {
int r = rs[i % rs.length];
int d = ds[i % ds.length];
LOG.debug("write i: {}", i);
+ if (i != 0 && r == 0) {
+ memColumnsStore.endRecord();
+ }
if (d == 2) {
columnWriter.write((long)i, r, d);
} else {
columnWriter.writeNull(r, d);
}
- memColumnsStore.endRecord();
}
+ memColumnsStore.endRecord();
memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, mt);