You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/12/04 22:16:25 UTC
[2/3] incubator-parquet-mr git commit: PARQUET-117: implement the new
page format for Parquet 2.0
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index 04d3eeb..e6e0cc5 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -20,7 +20,6 @@ import static parquet.Log.DEBUG;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
import parquet.Log;
import parquet.Preconditions;
@@ -76,8 +75,8 @@ public class RunLengthBitPackingHybridDecoder {
return result;
}
- private void readNext() throws IOException {
- Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
+ private void readNext() throws IOException {
+ Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
final int header = BytesUtils.readUnsignedVarInt(in);
mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
@@ -92,7 +91,7 @@ public class RunLengthBitPackingHybridDecoder {
if (DEBUG) LOG.debug("reading " + currentCount + " values BIT PACKED");
currentBuffer = new int[currentCount]; // TODO: reuse a buffer
byte[] bytes = new byte[numGroups * bitWidth];
- // At the end of the file RLE data though, there might not be that many bytes left.
+ // At the end of the file RLE data though, there might not be that many bytes left.
int bytesToRead = (int)Math.ceil(currentCount * bitWidth / 8.0);
bytesToRead = Math.min(bytesToRead, in.available());
new DataInputStream(in).readFully(bytes, 0, bytesToRead);
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
index f30d3c5..ed0ac97 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
@@ -15,12 +15,10 @@
*/
package parquet.column.values.rle;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import parquet.Ints;
import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
import parquet.column.Encoding;
import parquet.column.values.ValuesWriter;
import parquet.io.ParquetEncodingException;
@@ -30,11 +28,9 @@ import parquet.io.ParquetEncodingException;
*/
public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
private final RunLengthBitPackingHybridEncoder encoder;
- private final ByteArrayOutputStream length;
public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity) {
this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity);
- this.length = new ByteArrayOutputStream(4);
}
@Override
@@ -45,7 +41,7 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
throw new ParquetEncodingException(e);
}
}
-
+
@Override
public void writeBoolean(boolean v) {
writeInteger(v ? 1 : 0);
@@ -66,8 +62,7 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
try {
// prepend the length of the column
BytesInput rle = encoder.toBytes();
- BytesUtils.writeIntLittleEndian(length, Ints.checkedCast(rle.size()));
- return BytesInput.concat(BytesInput.from(length.toByteArray()), rle);
+ return BytesInput.concat(BytesInput.fromInt(Ints.checkedCast(rle.size())), rle);
} catch (IOException e) {
throw new ParquetEncodingException(e);
}
@@ -81,7 +76,6 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
@Override
public void reset() {
encoder.reset();
- length.reset();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
index bc048b0..3cf664f 100644
--- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
@@ -168,9 +168,11 @@ public class MessageColumnIO extends GroupColumnIO {
private final FieldsMarker[] fieldsWritten;
private final int[] r;
private final ColumnWriter[] columnWriter;
+ private final ColumnWriteStore columns;
private boolean emptyField = true;
public MessageColumnIORecordConsumer(ColumnWriteStore columns) {
+ this.columns = columns;
int maxDepth = 0;
this.columnWriter = new ColumnWriter[MessageColumnIO.this.getLeaves().size()];
for (PrimitiveColumnIO primitiveColumnIO : MessageColumnIO.this.getLeaves()) {
@@ -214,6 +216,7 @@ public class MessageColumnIO extends GroupColumnIO {
@Override
public void endMessage() {
writeNullForMissingFieldsAtCurrentLevel();
+ columns.endRecord();
if (DEBUG) log("< MESSAGE END >");
if (DEBUG) printState();
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
new file mode 100644
index 0000000..325bf43
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
@@ -0,0 +1,105 @@
+package parquet.column.impl;
+
+import static junit.framework.Assert.assertEquals;
+import static parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnReader;
+import parquet.column.ParquetProperties;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV2;
+import parquet.column.page.mem.MemPageReader;
+import parquet.column.page.mem.MemPageWriter;
+import parquet.io.api.Binary;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+public class TestColumnReaderImpl {
+
+ private int rows = 13001;
+
+ private static final class ValidatingConverter extends PrimitiveConverter {
+ int count;
+
+ @Override
+ public void addBinary(Binary value) {
+ assertEquals("bar" + count % 10, value.toStringUsingUTF8());
+ ++ count;
+ }
+ }
+
+ @Test
+ public void test() {
+ MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
+ ColumnDescriptor col = schema.getColumns().get(0);
+ MemPageWriter pageWriter = new MemPageWriter();
+ ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true));
+ for (int i = 0; i < rows; i++) {
+ columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
+ if ((i + 1) % 1000 == 0) {
+ columnWriterV2.writePage(i);
+ }
+ }
+ columnWriterV2.writePage(rows);
+ columnWriterV2.finalizeColumnChunk();
+ List<DataPage> pages = pageWriter.getPages();
+ int valueCount = 0;
+ int rowCount = 0;
+ for (DataPage dataPage : pages) {
+ valueCount += dataPage.getValueCount();
+ rowCount += ((DataPageV2)dataPage).getRowCount();
+ }
+ assertEquals(rows, rowCount);
+ assertEquals(rows, valueCount);
+ MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
+ ValidatingConverter converter = new ValidatingConverter();
+ ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
+ for (int i = 0; i < rows; i++) {
+ assertEquals(0, columnReader.getCurrentRepetitionLevel());
+ assertEquals(0, columnReader.getCurrentDefinitionLevel());
+ columnReader.writeCurrentValueToConverter();
+ columnReader.consume();
+ }
+ assertEquals(rows, converter.count);
+ }
+
+ @Test
+ public void testOptional() {
+ MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
+ ColumnDescriptor col = schema.getColumns().get(0);
+ MemPageWriter pageWriter = new MemPageWriter();
+ ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true));
+ for (int i = 0; i < rows; i++) {
+ columnWriterV2.writeNull(0, 0);
+ if ((i + 1) % 1000 == 0) {
+ columnWriterV2.writePage(i);
+ }
+ }
+ columnWriterV2.writePage(rows);
+ columnWriterV2.finalizeColumnChunk();
+ List<DataPage> pages = pageWriter.getPages();
+ int valueCount = 0;
+ int rowCount = 0;
+ for (DataPage dataPage : pages) {
+ valueCount += dataPage.getValueCount();
+ rowCount += ((DataPageV2)dataPage).getRowCount();
+ }
+ assertEquals(rows, rowCount);
+ assertEquals(rows, valueCount);
+ MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
+ ValidatingConverter converter = new ValidatingConverter();
+ ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
+ for (int i = 0; i < rows; i++) {
+ assertEquals(0, columnReader.getCurrentRepetitionLevel());
+ assertEquals(0, columnReader.getCurrentDefinitionLevel());
+ columnReader.consume();
+ }
+ assertEquals(0, converter.count);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java
index 46b3b8f..a386bbb 100644
--- a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java
@@ -23,17 +23,15 @@ import parquet.Log;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnReader;
import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties;
import parquet.column.ParquetProperties.WriterVersion;
import parquet.column.impl.ColumnReadStoreImpl;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
import parquet.column.page.mem.MemPageStore;
import parquet.example.DummyRecordConverter;
import parquet.io.api.Binary;
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
-
public class TestMemColumn {
private static final Log LOG = Log.getLog(TestMemColumn.class);
@@ -42,9 +40,10 @@ public class TestMemColumn {
MessageType schema = MessageTypeParser.parseMessageType("message msg { required group foo { required int64 bar; } }");
ColumnDescriptor path = schema.getColumnDescription(new String[] {"foo", "bar"});
MemPageStore memPageStore = new MemPageStore(10);
- ColumnWriter columnWriter = getColumnWriter(path, memPageStore);
+ ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+ ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
columnWriter.write(42l, 0, 0);
- columnWriter.flush();
+ memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, schema);
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
@@ -56,7 +55,7 @@ public class TestMemColumn {
}
private ColumnWriter getColumnWriter(ColumnDescriptor path, MemPageStore memPageStore) {
- ColumnWriteStoreImpl memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+ ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
return columnWriter;
}
@@ -75,13 +74,13 @@ public class TestMemColumn {
String[] col = new String[]{"foo", "bar"};
MemPageStore memPageStore = new MemPageStore(10);
- ColumnWriteStoreImpl memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+ ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnDescriptor path1 = mt.getColumnDescription(col);
ColumnDescriptor path = path1;
ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
columnWriter.write(Binary.fromString("42"), 0, 0);
- columnWriter.flush();
+ memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
@@ -97,7 +96,7 @@ public class TestMemColumn {
MessageType mt = MessageTypeParser.parseMessageType("message msg { required group foo { required int64 bar; } }");
String[] col = new String[]{"foo", "bar"};
MemPageStore memPageStore = new MemPageStore(10);
- ColumnWriteStoreImpl memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+ ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnDescriptor path1 = mt.getColumnDescription(col);
ColumnDescriptor path = path1;
@@ -105,7 +104,7 @@ public class TestMemColumn {
for (int i = 0; i < 2000; i++) {
columnWriter.write(42l, 0, 0);
}
- columnWriter.flush();
+ memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
@@ -121,7 +120,7 @@ public class TestMemColumn {
MessageType mt = MessageTypeParser.parseMessageType("message msg { repeated group foo { repeated int64 bar; } }");
String[] col = new String[]{"foo", "bar"};
MemPageStore memPageStore = new MemPageStore(10);
- ColumnWriteStoreImpl memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+ ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnDescriptor path1 = mt.getColumnDescription(col);
ColumnDescriptor path = path1;
@@ -138,7 +137,7 @@ public class TestMemColumn {
columnWriter.writeNull(r, d);
}
}
- columnWriter.flush();
+ memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
int i = 0;
@@ -156,7 +155,7 @@ public class TestMemColumn {
}
}
- private ColumnWriteStoreImpl newColumnWriteStoreImpl(MemPageStore memPageStore) {
- return new ColumnWriteStoreImpl(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0);
+ private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
+ return new ColumnWriteStoreV1(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java b/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java
index f33a531..3c0acd3 100644
--- a/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java
+++ b/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java
@@ -23,7 +23,7 @@ import org.junit.Test;
import parquet.bytes.BytesInput;
import parquet.column.ColumnDescriptor;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
import parquet.column.page.PageReader;
import parquet.column.page.PageWriter;
import parquet.column.page.mem.MemPageStore;
@@ -49,7 +49,7 @@ public class TestMemPageStore {
System.out.println(totalValueCount);
int total = 0;
do {
- Page readPage = pageReader.readPage();
+ DataPage readPage = pageReader.readPage();
total += readPage.getValueCount();
System.out.println(readPage);
// TODO: assert
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java b/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java
index e6a5d7a..523d87c 100644
--- a/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java
+++ b/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
import parquet.Log;
import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
import parquet.column.page.PageReader;
import parquet.io.ParquetDecodingException;
@@ -31,10 +31,10 @@ public class MemPageReader implements PageReader {
private static final Log LOG = Log.getLog(MemPageReader.class);
private final long totalValueCount;
- private final Iterator<Page> pages;
+ private final Iterator<DataPage> pages;
private final DictionaryPage dictionaryPage;
- public MemPageReader(long totalValueCount, Iterator<Page> pages, DictionaryPage dictionaryPage) {
+ public MemPageReader(long totalValueCount, Iterator<DataPage> pages, DictionaryPage dictionaryPage) {
super();
checkNotNull(pages, "pages");
this.totalValueCount = totalValueCount;
@@ -48,9 +48,9 @@ public class MemPageReader implements PageReader {
}
@Override
- public Page readPage() {
+ public DataPage readPage() {
if (pages.hasNext()) {
- Page next = pages.next();
+ DataPage next = pages.next();
if (DEBUG) LOG.debug("read page " + next);
return next;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java b/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java
index c3a9fd0..6facf34 100644
--- a/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java
+++ b/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java
@@ -23,7 +23,7 @@ import java.util.Map;
import parquet.Log;
import parquet.column.ColumnDescriptor;
import parquet.column.UnknownColumnException;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
import parquet.column.page.PageReadStore;
import parquet.column.page.PageReader;
import parquet.column.page.PageWriteStore;
@@ -58,7 +58,7 @@ public class MemPageStore implements PageReadStore, PageWriteStore {
if (pageWriter == null) {
throw new UnknownColumnException(descriptor);
}
- List<Page> pages = new ArrayList<Page>(pageWriter.getPages());
+ List<DataPage> pages = new ArrayList<DataPage>(pageWriter.getPages());
if (Log.DEBUG) LOG.debug("initialize page reader with "+ pageWriter.getTotalValueCount() + " values and " + pages.size() + " pages");
return new MemPageReader(pageWriter.getTotalValueCount(), pages.iterator(), pageWriter.getDictionaryPage());
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java
index 01b0873..c70e023 100644
--- a/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java
+++ b/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java
@@ -16,52 +16,57 @@
package parquet.column.page.mem;
import static parquet.Log.DEBUG;
+import static parquet.bytes.BytesInput.copy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.column.Encoding;
+import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
import parquet.column.page.PageWriter;
import parquet.column.statistics.Statistics;
import parquet.io.ParquetEncodingException;
-
public class MemPageWriter implements PageWriter {
private static final Log LOG = Log.getLog(MemPageWriter.class);
- private final List<Page> pages = new ArrayList<Page>();
+ private final List<DataPage> pages = new ArrayList<DataPage>();
private DictionaryPage dictionaryPage;
private long memSize = 0;
private long totalValueCount = 0;
- @Deprecated
@Override
- public void writePage(BytesInput bytesInput, int valueCount, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
+ public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
throws IOException {
if (valueCount == 0) {
throw new ParquetEncodingException("illegal page of 0 values");
}
memSize += bytesInput.size();
- pages.add(new Page(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), rlEncoding, dlEncoding, valuesEncoding));
+ pages.add(new DataPageV1(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), statistics, rlEncoding, dlEncoding, valuesEncoding));
totalValueCount += valueCount;
if (DEBUG) LOG.debug("page written for " + bytesInput.size() + " bytes and " + valueCount + " records");
}
@Override
- public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
- throws IOException {
+ public void writePageV2(int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data, Statistics<?> statistics) throws IOException {
if (valueCount == 0) {
throw new ParquetEncodingException("illegal page of 0 values");
}
- memSize += bytesInput.size();
- pages.add(new Page(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), statistics, rlEncoding, dlEncoding, valuesEncoding));
+ long size = repetitionLevels.size() + definitionLevels.size() + data.size();
+ memSize += size;
+ pages.add(DataPageV2.uncompressed(rowCount, nullCount, valueCount, copy(repetitionLevels), copy(definitionLevels), dataEncoding, copy(data), statistics));
totalValueCount += valueCount;
- if (DEBUG) LOG.debug("page written for " + bytesInput.size() + " bytes and " + valueCount + " records");
+ if (DEBUG) LOG.debug("page written for " + size + " bytes and " + valueCount + " records");
+
}
@Override
@@ -69,7 +74,7 @@ public class MemPageWriter implements PageWriter {
return memSize;
}
- public List<Page> getPages() {
+ public List<DataPage> getPages() {
return pages;
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/io/PerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/PerfTest.java b/parquet-column/src/test/java/parquet/io/PerfTest.java
index da46b51..9cd31e3 100644
--- a/parquet-column/src/test/java/parquet/io/PerfTest.java
+++ b/parquet-column/src/test/java/parquet/io/PerfTest.java
@@ -25,7 +25,7 @@ import java.util.logging.Level;
import parquet.Log;
import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
import parquet.column.page.mem.MemPageStore;
import parquet.example.DummyRecordConverter;
import parquet.example.data.GroupWriter;
@@ -74,7 +74,7 @@ public class PerfTest {
private static void write(MemPageStore memPageStore) {
- ColumnWriteStoreImpl columns = new ColumnWriteStoreImpl(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
+ ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
MessageColumnIO columnIO = newColumnFactory(schema);
GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
@@ -90,7 +90,7 @@ public class PerfTest {
write(memPageStore, groupWriter, 1000000);
columns.flush();
System.out.println();
- System.out.println(columns.memSize()+" bytes used total");
+ System.out.println(columns.getBufferedSize() + " bytes used total");
System.out.println("max col size: "+columns.maxColMemSize()+" bytes");
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/io/TestColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/parquet/io/TestColumnIO.java
index dddbcca..d4442df 100644
--- a/parquet-column/src/test/java/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/parquet/io/TestColumnIO.java
@@ -34,20 +34,18 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
-
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+
import parquet.Log;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriteStore;
import parquet.column.ColumnWriter;
import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.statistics.Statistics;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
import parquet.column.page.PageReadStore;
import parquet.column.page.mem.MemPageStore;
import parquet.example.data.Group;
@@ -285,7 +283,7 @@ public class TestColumnIO {
private void writeGroups(MessageType writtenSchema, MemPageStore memPageStore, Group... groups) {
ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
- ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+ ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(writtenSchema);
GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), writtenSchema);
for (Group group : groups) {
@@ -303,7 +301,7 @@ public class TestColumnIO {
log(r2);
MemPageStore memPageStore = new MemPageStore(2);
- ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+ ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
{
@@ -453,7 +451,7 @@ public class TestColumnIO {
private void testSchema(MessageType messageSchema, List<Group> groups) {
MemPageStore memPageStore = new MemPageStore(groups.size());
- ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+ ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(messageSchema);
@@ -505,7 +503,7 @@ public class TestColumnIO {
@Test
public void testPushParser() {
MemPageStore memPageStore = new MemPageStore(1);
- ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+ ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
new GroupWriter(columnIO.getRecordWriter(columns), schema).write(r1);
columns.flush();
@@ -515,14 +513,14 @@ public class TestColumnIO {
}
- private ColumnWriteStoreImpl newColumnWriteStore(MemPageStore memPageStore) {
- return new ColumnWriteStoreImpl(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
+ private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
+ return new ColumnWriteStoreV1(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
}
@Test
public void testEmptyField() {
MemPageStore memPageStore = new MemPageStore(1);
- ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+ ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
final RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
recordWriter.startMessage();
@@ -579,77 +577,95 @@ public class TestColumnIO {
"[Name, Url]: http://C, r:0, d:2",
"[Name, Language, Code]: null, r:0, d:1",
"[Name, Language, Country]: null, r:0, d:1"
-
};
- ColumnWriteStore columns = new ColumnWriteStore() {
- int counter = 0;
+ ValidatingColumnWriteStore columns = new ValidatingColumnWriteStore(expected);
+ MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+ GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+ groupWriter.write(r1);
+ groupWriter.write(r2);
+ columns.validate();
+ }
+}
+final class ValidatingColumnWriteStore implements ColumnWriteStore {
+ private final String[] expected;
+ int counter = 0;
+
+ ValidatingColumnWriteStore(String[] expected) {
+ this.expected = expected;
+ }
+
+ @Override
+ public ColumnWriter getColumnWriter(final ColumnDescriptor path) {
+ return new ColumnWriter() {
+ private void validate(Object value, int repetitionLevel,
+ int definitionLevel) {
+ String actual = Arrays.toString(path.getPath())+": "+value+", r:"+repetitionLevel+", d:"+definitionLevel;
+ assertEquals("event #" + counter, expected[counter], actual);
+ ++ counter;
+ }
+
+ @Override
+ public void writeNull(int repetitionLevel, int definitionLevel) {
+ validate(null, repetitionLevel, definitionLevel);
+ }
+
+ @Override
+ public void write(Binary value, int repetitionLevel, int definitionLevel) {
+ validate(value.toStringUsingUTF8(), repetitionLevel, definitionLevel);
+ }
+
+ @Override
+ public void write(boolean value, int repetitionLevel, int definitionLevel) {
+ validate(value, repetitionLevel, definitionLevel);
+ }
+
+ @Override
+ public void write(int value, int repetitionLevel, int definitionLevel) {
+ validate(value, repetitionLevel, definitionLevel);
+ }
+
+ @Override
+ public void write(long value, int repetitionLevel, int definitionLevel) {
+ validate(value, repetitionLevel, definitionLevel);
+ }
@Override
- public ColumnWriter getColumnWriter(final ColumnDescriptor path) {
- return new ColumnWriter() {
- private void validate(Object value, int repetitionLevel,
- int definitionLevel) {
- String actual = Arrays.toString(path.getPath())+": "+value+", r:"+repetitionLevel+", d:"+definitionLevel;
- assertEquals("event #" + counter, expected[counter], actual);
- ++ counter;
- }
-
- @Override
- public void writeNull(int repetitionLevel, int definitionLevel) {
- validate(null, repetitionLevel, definitionLevel);
- }
-
- @Override
- public void write(Binary value, int repetitionLevel, int definitionLevel) {
- validate(value.toStringUsingUTF8(), repetitionLevel, definitionLevel);
- }
-
- @Override
- public void write(boolean value, int repetitionLevel, int definitionLevel) {
- validate(value, repetitionLevel, definitionLevel);
- }
-
- @Override
- public void write(int value, int repetitionLevel, int definitionLevel) {
- validate(value, repetitionLevel, definitionLevel);
- }
-
- @Override
- public void write(long value, int repetitionLevel, int definitionLevel) {
- validate(value, repetitionLevel, definitionLevel);
- }
-
- @Override
- public void write(float value, int repetitionLevel, int definitionLevel) {
- validate(value, repetitionLevel, definitionLevel);
- }
-
- @Override
- public void write(double value, int repetitionLevel, int definitionLevel) {
- validate(value, repetitionLevel, definitionLevel);
- }
-
- @Override
- public void flush() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getBufferedSizeInMemory() {
- throw new UnsupportedOperationException();
- }
- };
+ public void write(float value, int repetitionLevel, int definitionLevel) {
+ validate(value, repetitionLevel, definitionLevel);
}
+
@Override
- public void flush() {
- assertEquals("read all events", expected.length, counter);
+ public void write(double value, int repetitionLevel, int definitionLevel) {
+ validate(value, repetitionLevel, definitionLevel);
}
};
- MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
- GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
- groupWriter.write(r1);
- groupWriter.write(r2);
- columns.flush();
+ }
+
+ public void validate() {
+ assertEquals("read all events", expected.length, counter);
+ }
+
+ @Override
+ public void endRecord() {
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return 0;
+ }
+
+ @Override
+ public long getBufferedSize() {
+ return 0;
+ }
+
+ @Override
+ public String memUsageString() {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java
index 0107b36..7acf6f1 100644
--- a/parquet-column/src/test/java/parquet/io/TestFiltered.java
+++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.junit.Test;
import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
import parquet.column.page.mem.MemPageStore;
import parquet.example.data.Group;
import parquet.example.data.GroupWriter;
@@ -254,7 +254,7 @@ public class TestFiltered {
private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) {
MemPageStore memPageStore = new MemPageStore(number * 2);
- ColumnWriteStoreImpl columns = new ColumnWriteStoreImpl(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0);
+ ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0);
GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
for ( int i = 0; i < number; i++ ) {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
index b134264..198b654 100644
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
@@ -15,6 +15,9 @@
*/
package parquet.format.converter;
+import static parquet.format.Util.readFileMetaData;
+import static parquet.format.Util.writePageHeader;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -29,13 +32,13 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.hadoop.io.UTF8;
import parquet.Log;
import parquet.common.schema.ColumnPath;
import parquet.format.ColumnChunk;
import parquet.format.ColumnMetaData;
import parquet.format.ConvertedType;
import parquet.format.DataPageHeader;
+import parquet.format.DataPageHeaderV2;
import parquet.format.DictionaryPageHeader;
import parquet.format.Encoding;
import parquet.format.FieldRepetitionType;
@@ -60,9 +63,6 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.schema.Type.Repetition;
import parquet.schema.TypeVisitor;
import parquet.schema.Types;
-import static java.lang.Math.min;
-import static parquet.format.Util.readFileMetaData;
-import static parquet.format.Util.writePageHeader;
public class ParquetMetadataConverter {
private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);
@@ -671,14 +671,49 @@ public class ParquetMetadataConverter {
parquet.column.Encoding valuesEncoding) {
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
// TODO: pageHeader.crc = ...;
- pageHeader.data_page_header = new DataPageHeader(
+ pageHeader.setData_page_header(new DataPageHeader(
valueCount,
getEncoding(valuesEncoding),
getEncoding(dlEncoding),
- getEncoding(rlEncoding));
+ getEncoding(rlEncoding)));
+ if (!statistics.isEmpty()) {
+ pageHeader.getData_page_header().setStatistics(toParquetStatistics(statistics));
+ }
+ return pageHeader;
+ }
+
+ public void writeDataPageV2Header(
+ int uncompressedSize, int compressedSize,
+ int valueCount, int nullCount, int rowCount,
+ parquet.column.statistics.Statistics statistics,
+ parquet.column.Encoding dataEncoding,
+ int rlByteLength, int dlByteLength,
+ OutputStream to) throws IOException {
+ writePageHeader(
+ newDataPageV2Header(
+ uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount,
+ statistics,
+ dataEncoding,
+ rlByteLength, dlByteLength), to);
+ }
+
+ private PageHeader newDataPageV2Header(
+ int uncompressedSize, int compressedSize,
+ int valueCount, int nullCount, int rowCount,
+ parquet.column.statistics.Statistics<?> statistics,
+ parquet.column.Encoding dataEncoding,
+ int rlByteLength, int dlByteLength) {
+ // TODO: pageHeader.crc = ...;
+ DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
+ valueCount, nullCount, rowCount,
+ getEncoding(dataEncoding),
+ dlByteLength, rlByteLength);
if (!statistics.isEmpty()) {
- pageHeader.data_page_header.setStatistics(toParquetStatistics(statistics));
+ dataPageHeaderV2.setStatistics(toParquetStatistics(statistics));
}
+ PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
+ pageHeader.setData_page_header_v2(dataPageHeaderV2);
return pageHeader;
}
@@ -686,7 +721,7 @@ public class ParquetMetadataConverter {
int uncompressedSize, int compressedSize, int valueCount,
parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
- pageHeader.dictionary_page_header = new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding));
+ pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
writePageHeader(pageHeader, to);
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
index 2910b9f..b6809a4 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -21,13 +21,17 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import parquet.Ints;
import parquet.Log;
import parquet.column.ColumnDescriptor;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
import parquet.column.page.PageReadStore;
import parquet.column.page.PageReader;
import parquet.hadoop.CodecFactory.BytesDecompressor;
+import parquet.io.ParquetDecodingException;
/**
* TODO: should this actually be called RowGroupImpl or something?
@@ -49,15 +53,15 @@ class ColumnChunkPageReadStore implements PageReadStore {
private final BytesDecompressor decompressor;
private final long valueCount;
- private final List<Page> compressedPages;
+ private final List<DataPage> compressedPages;
private final DictionaryPage compressedDictionaryPage;
- ColumnChunkPageReader(BytesDecompressor decompressor, List<Page> compressedPages, DictionaryPage compressedDictionaryPage) {
+ ColumnChunkPageReader(BytesDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
this.decompressor = decompressor;
- this.compressedPages = new LinkedList<Page>(compressedPages);
+ this.compressedPages = new LinkedList<DataPage>(compressedPages);
this.compressedDictionaryPage = compressedDictionaryPage;
int count = 0;
- for (Page p : compressedPages) {
+ for (DataPage p : compressedPages) {
count += p.getValueCount();
}
this.valueCount = count;
@@ -69,23 +73,53 @@ class ColumnChunkPageReadStore implements PageReadStore {
}
@Override
- public Page readPage() {
+ public DataPage readPage() {
if (compressedPages.isEmpty()) {
return null;
}
- Page compressedPage = compressedPages.remove(0);
- try {
- return new Page(
- decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()),
- compressedPage.getValueCount(),
- compressedPage.getUncompressedSize(),
- compressedPage.getStatistics(),
- compressedPage.getRlEncoding(),
- compressedPage.getDlEncoding(),
- compressedPage.getValueEncoding());
- } catch (IOException e) {
- throw new RuntimeException(e); // TODO: cleanup
- }
+ DataPage compressedPage = compressedPages.remove(0);
+ return compressedPage.accept(new DataPage.Visitor<DataPage>() {
+ @Override
+ public DataPage visit(DataPageV1 dataPageV1) {
+ try {
+ return new DataPageV1(
+ decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()),
+ dataPageV1.getValueCount(),
+ dataPageV1.getUncompressedSize(),
+ dataPageV1.getStatistics(),
+ dataPageV1.getRlEncoding(),
+ dataPageV1.getDlEncoding(),
+ dataPageV1.getValueEncoding());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not decompress page", e);
+ }
+ }
+
+ @Override
+ public DataPage visit(DataPageV2 dataPageV2) {
+ if (!dataPageV2.isCompressed()) {
+ return dataPageV2;
+ }
+ try {
+ int uncompressedSize = Ints.checkedCast(
+ dataPageV2.getUncompressedSize()
+ - dataPageV2.getDefinitionLevels().size()
+ - dataPageV2.getRepetitionLevels().size());
+ return DataPageV2.uncompressed(
+ dataPageV2.getRowCount(),
+ dataPageV2.getNullCount(),
+ dataPageV2.getValueCount(),
+ dataPageV2.getRepetitionLevels(),
+ dataPageV2.getDefinitionLevels(),
+ dataPageV2.getDataEncoding(),
+ decompressor.decompress(dataPageV2.getData(), uncompressedSize),
+ dataPageV2.getStatistics()
+ );
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not decompress page", e);
+ }
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
index 6d7f685..64fb7cd 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,7 +33,6 @@ import parquet.column.page.DictionaryPage;
import parquet.column.page.PageWriteStore;
import parquet.column.page.PageWriter;
import parquet.column.statistics.Statistics;
-import parquet.column.statistics.BooleanStatistics;
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.CodecFactory.BytesCompressor;
import parquet.io.ParquetEncodingException;
@@ -69,10 +67,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
}
- @Deprecated
@Override
public void writePage(BytesInput bytes,
int valueCount,
+ Statistics statistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
@@ -80,16 +78,15 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
if (uncompressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write page larger than Integer.MAX_VALUE bytes: " +
- uncompressedSize);
+ uncompressedSize);
}
BytesInput compressedBytes = compressor.compress(bytes);
long compressedSize = compressedBytes.size();
if (compressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
- + compressedSize);
+ + compressedSize);
}
- BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object
parquetMetadataConverter.writeDataPageHeader(
(int)uncompressedSize,
(int)compressedSize,
@@ -103,6 +100,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
+ this.totalStatistics.mergeStatistics(statistics);
compressedBytes.writeAllTo(buf);
encodings.add(rlEncoding);
encodings.add(dlEncoding);
@@ -110,43 +108,46 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
}
@Override
- public void writePage(BytesInput bytes,
- int valueCount,
- Statistics statistics,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding) throws IOException {
- long uncompressedSize = bytes.size();
- if (uncompressedSize > Integer.MAX_VALUE) {
- throw new ParquetEncodingException(
- "Cannot write page larger than Integer.MAX_VALUE bytes: " +
- uncompressedSize);
- }
- BytesInput compressedBytes = compressor.compress(bytes);
- long compressedSize = compressedBytes.size();
- if (compressedSize > Integer.MAX_VALUE) {
- throw new ParquetEncodingException(
- "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
- + compressedSize);
- }
- parquetMetadataConverter.writeDataPageHeader(
- (int)uncompressedSize,
- (int)compressedSize,
- valueCount,
+ public void writePageV2(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ Statistics<?> statistics) throws IOException {
+ int rlByteLength = toIntWithCheck(repetitionLevels.size());
+ int dlByteLength = toIntWithCheck(definitionLevels.size());
+ int uncompressedSize = toIntWithCheck(
+ data.size() + repetitionLevels.size() + definitionLevels.size()
+ );
+ // TODO: decide if we compress
+ BytesInput compressedData = compressor.compress(data);
+ int compressedSize = toIntWithCheck(
+ compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+ );
+ parquetMetadataConverter.writeDataPageV2Header(
+ uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount,
statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
+ dataEncoding,
+ rlByteLength, dlByteLength,
buf);
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
this.totalStatistics.mergeStatistics(statistics);
- compressedBytes.writeAllTo(buf);
- encodings.add(rlEncoding);
- encodings.add(dlEncoding);
- encodings.add(valuesEncoding);
+ repetitionLevels.writeAllTo(buf);
+ definitionLevels.writeAllTo(buf);
+ compressedData.writeAllTo(buf);
+ encodings.add(dataEncoding);
+ }
+
+ private int toIntWithCheck(long size) {
+ if (size > Integer.MAX_VALUE) {
+ throw new ParquetEncodingException(
+ "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
+ size);
+ }
+ return (int)size;
}
@Override
@@ -199,28 +200,20 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
}
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
- private final MessageType schema;
- private final BytesCompressor compressor;
- private final int initialSize;
public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
- this.compressor = compressor;
- this.schema = schema;
- this.initialSize = initialSize;
+ for (ColumnDescriptor path : schema.getColumns()) {
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize));
+ }
}
@Override
public PageWriter getPageWriter(ColumnDescriptor path) {
- if (!writers.containsKey(path)) {
- writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize));
- }
return writers.get(path);
}
public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
- List<ColumnDescriptor> columns = schema.getColumns();
- for (ColumnDescriptor columnDescriptor : columns) {
- ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
+ for (ColumnChunkPageWriter pageWriter : writers.values()) {
pageWriter.writeToFileWriter(writer);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index d73c811..79ef5ac 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -26,8 +26,11 @@ import java.util.HashMap;
import java.util.Map;
import parquet.Log;
+import parquet.column.ColumnWriteStore;
+import parquet.column.ParquetProperties;
import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
+import parquet.column.impl.ColumnWriteStoreV2;
import parquet.hadoop.CodecFactory.BytesCompressor;
import parquet.hadoop.api.WriteSupport;
import parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -49,17 +52,16 @@ class InternalParquetRecordWriter<T> {
private final int rowGroupSize;
private final int pageSize;
private final BytesCompressor compressor;
- private final int dictionaryPageSize;
- private final boolean enableDictionary;
private final boolean validating;
- private final WriterVersion writerVersion;
+ private final ParquetProperties parquetProperties;
private long recordCount = 0;
private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
- private ColumnWriteStoreImpl columnStore;
+ private ColumnWriteStore columnStore;
private ColumnChunkPageWriteStore pageStore;
+
/**
* @param parquetFileWriter the file to write to
* @param writeSupport the class to convert incoming records
@@ -87,10 +89,8 @@ class InternalParquetRecordWriter<T> {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.compressor = compressor;
- this.dictionaryPageSize = dictionaryPageSize;
- this.enableDictionary = enableDictionary;
this.validating = validating;
- this.writerVersion = writerVersion;
+ this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);
initStore();
}
@@ -103,7 +103,11 @@ class InternalParquetRecordWriter<T> {
// we don't want this number to be too small either
// ideally, slightly bigger than the page size, but not bigger than the block buffer
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
- columnStore = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
+ columnStore = parquetProperties.newColumnWriteStore(
+ schema,
+ pageStore,
+ pageSize,
+ initialPageBufferSize);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
}
@@ -124,7 +128,7 @@ class InternalParquetRecordWriter<T> {
private void checkBlockSizeReached() throws IOException {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
- long memSize = columnStore.memSize();
+ long memSize = columnStore.getBufferedSize();
if (memSize > rowGroupSize) {
LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSize, recordCount));
flushRowGroupToStore();
@@ -143,8 +147,8 @@ class InternalParquetRecordWriter<T> {
private void flushRowGroupToStore()
throws IOException {
- LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.allocatedSize()));
- if (columnStore.allocatedSize() > 3 * (long)rowGroupSize) {
+ LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
+ if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSize) {
LOG.warn("Too much memory used: " + columnStore.memUsageString());
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index 74d65fe..47308c5 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -19,7 +19,10 @@ import static parquet.Log.DEBUG;
import static parquet.bytes.BytesUtils.readIntLittleEndian;
import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
-import static parquet.hadoop.ParquetFileWriter.*;
+import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
+import static parquet.hadoop.ParquetFileWriter.MAGIC;
+import static parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
+import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
@@ -29,7 +32,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -48,15 +50,19 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.Utils;
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.column.ColumnDescriptor;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
import parquet.column.page.PageReadStore;
import parquet.common.schema.ColumnPath;
+import parquet.format.DataPageHeader;
+import parquet.format.DataPageHeaderV2;
+import parquet.format.DictionaryPageHeader;
import parquet.format.PageHeader;
import parquet.format.Util;
import parquet.format.converter.ParquetMetadataConverter;
@@ -81,7 +87,7 @@ public class ParquetFileReader implements Closeable {
public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";
- private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+ private static ParquetMetadataConverter converter = new ParquetMetadataConverter();
/**
* for files provided, check if there's a summary file.
@@ -423,7 +429,7 @@ public class ParquetFileReader implements Closeable {
throw new RuntimeException("corrupted file: the footer index is not within the file");
}
f.seek(footerIndex);
- return parquetMetadataConverter.readParquetMetadata(f, filter);
+ return converter.readParquetMetadata(f, filter);
} finally {
f.close();
}
@@ -535,41 +541,63 @@ public class ParquetFileReader implements Closeable {
* @return the list of pages
*/
public ColumnChunkPageReader readAllPages() throws IOException {
- List<Page> pagesInChunk = new ArrayList<Page>();
+ List<DataPage> pagesInChunk = new ArrayList<DataPage>();
DictionaryPage dictionaryPage = null;
long valuesCountReadSoFar = 0;
while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
PageHeader pageHeader = readPageHeader();
+ int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+ int compressedPageSize = pageHeader.getCompressed_page_size();
switch (pageHeader.type) {
case DICTIONARY_PAGE:
// there is only one dictionary page per column chunk
if (dictionaryPage != null) {
throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
}
- dictionaryPage =
+ DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
+ dictionaryPage =
new DictionaryPage(
- this.readAsBytesInput(pageHeader.compressed_page_size),
- pageHeader.uncompressed_page_size,
- pageHeader.dictionary_page_header.num_values,
- parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+ this.readAsBytesInput(compressedPageSize),
+ uncompressedPageSize,
+ dicHeader.getNum_values(),
+ converter.getEncoding(dicHeader.getEncoding())
);
break;
case DATA_PAGE:
+ DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
pagesInChunk.add(
- new Page(
- this.readAsBytesInput(pageHeader.compressed_page_size),
- pageHeader.data_page_header.num_values,
- pageHeader.uncompressed_page_size,
- ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, descriptor.col.getType()),
- parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
- parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
- parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+ new DataPageV1(
+ this.readAsBytesInput(compressedPageSize),
+ dataHeaderV1.getNum_values(),
+ uncompressedPageSize,
+ fromParquetStatistics(dataHeaderV1.getStatistics(), descriptor.col.getType()),
+ converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+ converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+ converter.getEncoding(dataHeaderV1.getEncoding())
));
- valuesCountReadSoFar += pageHeader.data_page_header.num_values;
+ valuesCountReadSoFar += dataHeaderV1.getNum_values();
+ break;
+ case DATA_PAGE_V2:
+ DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
+ int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
+ pagesInChunk.add(
+ new DataPageV2(
+ dataHeaderV2.getNum_rows(),
+ dataHeaderV2.getNum_nulls(),
+ dataHeaderV2.getNum_values(),
+ this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
+ this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
+ converter.getEncoding(dataHeaderV2.getEncoding()),
+ this.readAsBytesInput(dataSize),
+ uncompressedPageSize,
+ fromParquetStatistics(dataHeaderV2.getStatistics(), descriptor.col.getType()),
+ dataHeaderV2.isIs_compressed()
+ ));
+ valuesCountReadSoFar += dataHeaderV2.getNum_values();
break;
default:
- if (DEBUG) LOG.debug("skipping page of type " + pageHeader.type + " of size " + pageHeader.compressed_page_size);
- this.skip(pageHeader.compressed_page_size);
+ if (DEBUG) LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize);
+ this.skip(compressedPageSize);
break;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..f499d1a
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -0,0 +1,107 @@
+package parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import parquet.bytes.BytesInput;
+import parquet.bytes.LittleEndianDataInputStream;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.column.page.DataPageV2;
+import parquet.column.page.PageReadStore;
+import parquet.column.page.PageReader;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.BinaryStatistics;
+import parquet.column.statistics.Statistics;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+public class TestColumnChunkPageWriteStore {
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
+ Path root = file.getParent();
+ FileSystem fs = file.getFileSystem(conf);
+ if (fs.exists(root)) {
+ fs.delete(root, true);
+ }
+ fs.mkdirs(root);
+ CodecFactory f = new CodecFactory(conf);
+ int pageSize = 1024;
+ int initialSize = 1024;
+ MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
+ ColumnDescriptor col = schema.getColumns().get(0);
+ Encoding dataEncoding = Encoding.PLAIN;
+ int valueCount = 10;
+ int d = 1;
+ int r = 2;
+ int v = 3;
+ BytesInput definitionLevels = BytesInput.fromInt(d);
+ BytesInput repetitionLevels = BytesInput.fromInt(r);
+ Statistics<?> statistics = new BinaryStatistics();
+ BytesInput data = BytesInput.fromInt(v);
+ int rowCount = 5;
+ int nullCount = 1;
+ CompressionCodecName codec = GZIP;
+
+ {
+ ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
+ writer.start();
+ writer.startBlock(rowCount);
+ {
+ ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize);
+ PageWriter pageWriter = store.getPageWriter(col);
+ pageWriter.writePageV2(
+ rowCount, nullCount, valueCount,
+ repetitionLevels, definitionLevels,
+ dataEncoding, data,
+ statistics);
+ store.flushToFileWriter(writer);
+ }
+ writer.endBlock();
+ writer.end(new HashMap<String, String>());
+ }
+
+ {
+ ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
+ ParquetFileReader reader = new ParquetFileReader(conf, file, footer.getBlocks(), schema.getColumns());
+ PageReadStore rowGroup = reader.readNextRowGroup();
+ PageReader pageReader = rowGroup.getPageReader(col);
+ DataPageV2 page = (DataPageV2)pageReader.readPage();
+ assertEquals(rowCount, page.getRowCount());
+ assertEquals(nullCount, page.getNullCount());
+ assertEquals(valueCount, page.getValueCount());
+ assertEquals(d, intValue(page.getDefinitionLevels()));
+ assertEquals(r, intValue(page.getRepetitionLevels()));
+ assertEquals(dataEncoding, page.getDataEncoding());
+ assertEquals(v, intValue(page.getData()));
+ assertEquals(statistics.toString(), page.getStatistics().toString());
+ reader.close();
+ }
+ }
+
+ private int intValue(BytesInput in) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ in.writeAllTo(baos);
+ LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ int i = os.readInt();
+ os.close();
+ return i;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
index c86753e..1d45469 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
@@ -42,7 +42,8 @@ import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV1;
import parquet.column.page.PageReadStore;
import parquet.column.page.PageReader;
import parquet.column.statistics.BinaryStatistics;
@@ -369,9 +370,9 @@ public class TestParquetFileWriter {
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
- Page page = pageReader.readPage();
+ DataPage page = pageReader.readPage();
assertEquals(values, page.getValueCount());
- assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
+ assertArrayEquals(bytes.toByteArray(), ((DataPageV1)page).getBytes().toByteArray());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriterNewPage.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriterNewPage.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriterNewPage.java
new file mode 100644
index 0000000..23a1f13
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriterNewPage.java
@@ -0,0 +1,117 @@
+package parquet.hadoop;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static parquet.column.Encoding.DELTA_BYTE_ARRAY;
+import static parquet.column.Encoding.PLAIN;
+import static parquet.column.Encoding.PLAIN_DICTIONARY;
+import static parquet.column.Encoding.RLE_DICTIONARY;
+import static parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static parquet.hadoop.ParquetFileReader.readFooter;
+import static parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import parquet.column.Encoding;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.example.data.Group;
+import parquet.example.data.simple.SimpleGroupFactory;
+import parquet.hadoop.example.GroupReadSupport;
+import parquet.hadoop.example.GroupWriteSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.io.api.Binary;
+import parquet.schema.MessageType;
+
+public class TestParquetWriterNewPage {
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ Path root = new Path("target/tests/TestParquetWriter/");
+ FileSystem fs = root.getFileSystem(conf);
+ if (fs.exists(root)) {
+ fs.delete(root, true);
+ }
+ fs.mkdirs(root);
+ MessageType schema = parseMessageType(
+ "message test { "
+ + "required binary binary_field; "
+ + "required int32 int32_field; "
+ + "required int64 int64_field; "
+ + "required boolean boolean_field; "
+ + "required float float_field; "
+ + "required double double_field; "
+ + "required fixed_len_byte_array(3) flba_field; "
+ + "required int96 int96_field; "
+ + "optional binary null_field; "
+ + "} ");
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+ Map<String, Encoding> expected = new HashMap<String, Encoding>();
+ expected.put("10-" + PARQUET_1_0, PLAIN_DICTIONARY);
+ expected.put("1000-" + PARQUET_1_0, PLAIN);
+ expected.put("10-" + PARQUET_2_0, RLE_DICTIONARY);
+ expected.put("1000-" + PARQUET_2_0, DELTA_BYTE_ARRAY);
+ for (int modulo : asList(10, 1000)) {
+ for (WriterVersion version : WriterVersion.values()) {
+ Path file = new Path(root, version.name() + "_" + modulo);
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(
+ file,
+ new GroupWriteSupport(),
+ UNCOMPRESSED, 1024, 1024, 512, true, false, version, conf);
+ for (int i = 0; i < 1000; i++) {
+ writer.write(
+ f.newGroup()
+ .append("binary_field", "test" + (i % modulo))
+ .append("int32_field", 32)
+ .append("int64_field", 64l)
+ .append("boolean_field", true)
+ .append("float_field", 1.0f)
+ .append("double_field", 2.0d)
+ .append("flba_field", "foo")
+ .append("int96_field", Binary.fromByteArray(new byte[12])));
+ }
+ writer.close();
+
+ ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).withConf(conf).build();
+ for (int i = 0; i < 1000; i++) {
+ Group group = reader.read();
+ assertEquals("test" + (i % modulo), group.getBinary("binary_field", 0).toStringUsingUTF8());
+ assertEquals(32, group.getInteger("int32_field", 0));
+ assertEquals(64l, group.getLong("int64_field", 0));
+ assertEquals(true, group.getBoolean("boolean_field", 0));
+ assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
+ assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
+ assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8());
+ assertEquals(Binary.fromByteArray(new byte[12]), group.getInt96("int96_field", 0));
+ assertEquals(0, group.getFieldRepetitionCount("null_field"));
+ }
+ reader.close();
+ ParquetMetadata footer = readFooter(conf, file, NO_FILTER);
+ for (BlockMetaData blockMetaData : footer.getBlocks()) {
+ for (ColumnChunkMetaData column : blockMetaData.getColumns()) {
+ if (column.getPath().toDotString().equals("binary_field")) {
+ String key = modulo + "-" + version;
+ Encoding expectedEncoding = expected.get(key);
+ assertTrue(
+ key + ":" + column.getEncodings() + " should contain " + expectedEncoding,
+ column.getEncodings().contains(expectedEncoding));
+ }
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java b/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java
deleted file mode 100644
index 24d634e..0000000
--- a/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.pig;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
-import parquet.column.page.Page;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.column.page.mem.MemPageStore;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.Repetition;
-
-public class GenerateIntTestFile {
- private static final Log LOG = Log.getLog(GenerateIntTestFile.class);
-
- public static void main(String[] args) throws Throwable {
- File out = new File("testdata/from_java/int_test_file");
- if (out.exists()) {
- if (!out.delete()) {
- throw new RuntimeException("can not remove existing file " + out.getAbsolutePath());
- }
- }
- Path testFile = new Path(out.toURI());
- Configuration configuration = new Configuration();
- {
- MessageType schema = new MessageType("int_test_file", new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int_col"));
-
- MemPageStore pageStore = new MemPageStore(100);
- ColumnWriteStoreImpl store = new ColumnWriteStoreImpl(pageStore, 8*1024, 8*1024, 8*1024, false, WriterVersion.PARQUET_1_0);
- MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-
- RecordConsumer recordWriter = columnIO.getRecordWriter(store);
-
- int recordCount = 0;
- for (int i = 0; i < 100; i++) {
- recordWriter.startMessage();
- recordWriter.startField("int_col", 0);
- if (i % 10 != 0) {
- recordWriter.addInteger(i);
- }
- recordWriter.endField("int_col", 0);
- recordWriter.endMessage();
- ++ recordCount;
- }
- store.flush();
-
-
- writeToFile(testFile, configuration, schema, pageStore, recordCount);
- }
-
- {
- readTestFile(testFile, configuration);
- }
- }
-
- public static void readTestFile(Path testFile, Configuration configuration)
- throws IOException {
- ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, testFile);
- MessageType schema = readFooter.getFileMetaData().getSchema();
- ParquetFileReader parquetFileReader = new ParquetFileReader(configuration, testFile, readFooter.getBlocks(), schema.getColumns());
- PageReadStore pages = parquetFileReader.readNextRowGroup();
- System.out.println(pages.getRowCount());
- }
-
- public static void writeToFile(Path file, Configuration configuration, MessageType schema, MemPageStore pageStore, int recordCount)
- throws IOException {
- ParquetFileWriter w = startFile(file, configuration, schema);
- writeBlock(schema, pageStore, recordCount, w);
- endFile(w);
- }
-
- public static void endFile(ParquetFileWriter w) throws IOException {
- w.end(new HashMap<String, String>());
- }
-
- public static void writeBlock(MessageType schema, MemPageStore pageStore,
- int recordCount, ParquetFileWriter w) throws IOException {
- w.startBlock(recordCount);
- List<ColumnDescriptor> columns = schema.getColumns();
- for (ColumnDescriptor columnDescriptor : columns) {
- PageReader pageReader = pageStore.getPageReader(columnDescriptor);
- long totalValueCount = pageReader.getTotalValueCount();
- w.startColumn(columnDescriptor, totalValueCount, CompressionCodecName.UNCOMPRESSED);
- int n = 0;
- do {
- Page page = pageReader.readPage();
- n += page.getValueCount();
- // TODO: change INTFC
- w.writeDataPage(
- page.getValueCount(),
- (int)page.getBytes().size(),
- BytesInput.from(page.getBytes().toByteArray()),
- page.getRlEncoding(),
- page.getDlEncoding(),
- page.getValueEncoding());
- } while (n < totalValueCount);
- w.endColumn();
- }
- w.endBlock();
- }
-
- public static ParquetFileWriter startFile(Path file,
- Configuration configuration, MessageType schema) throws IOException {
- ParquetFileWriter w = new ParquetFileWriter(configuration, schema, file);
- w.start();
- return w;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java b/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java
deleted file mode 100644
index 106dc30..0000000
--- a/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.pig;
-
-import static parquet.pig.GenerateIntTestFile.readTestFile;
-import static parquet.pig.GenerateIntTestFile.writeToFile;
-
-import java.io.File;
-import java.io.IOException;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
-import parquet.column.page.mem.MemPageStore;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.Repetition;
-
-public class GenerateTPCH {
- private static final Log LOG = Log.getLog(GenerateTPCH.class);
-
- public static void main(String[] args) throws IOException {
- File out = new File("testdata/from_java/tpch/customer");
- if (out.exists()) {
- if (!out.delete()) {
- throw new RuntimeException("can not remove existing file " + out.getAbsolutePath());
- }
- }
- Path testFile = new Path(out.toURI());
- Configuration configuration = new Configuration();
- MessageType schema = new MessageType("customer",
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "c_custkey"),
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_name"),
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_address"),
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "c_nationkey"),
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_phone"),
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "c_acctbal"),
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_mktsegment"),
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_comment")
- );
-
- MemPageStore pageStore = new MemPageStore(150000);
- ColumnWriteStoreImpl store = new ColumnWriteStoreImpl(pageStore, 20*1024, 1*1024, 20*1024, false, WriterVersion.PARQUET_1_0);
- MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
-
- RecordConsumer recordWriter = columnIO.getRecordWriter(store);
-
- int recordCount = 0;
- for (int i = 0; i < 150000; i++) {
- recordWriter.startMessage();
- writeField(recordWriter, 0, "c_custkey", i % 10 == 0 ? null : i);
- writeField(recordWriter, 1, "c_name", i % 11 == 0 ? null : "name_" + i);
- writeField(recordWriter, 2, "c_address", i % 12 == 0 ? null : "add_" + i);
- writeField(recordWriter, 3, "c_nationkey", i % 13 == 0 ? null : i);
- writeField(recordWriter, 4, "c_phone", i % 14 == 0 ? null : "phone_" + i);
- writeField(recordWriter, 5, "c_acctbal", i % 15 == 0 ? null : 1.2d * i);
- writeField(recordWriter, 6, "c_mktsegment", i % 16 == 0 ? null : "mktsegment_" + i);
- writeField(recordWriter, 7, "c_comment", i % 17 == 0 ? null : "comment_" + i);
- recordWriter.endMessage();
- ++ recordCount;
- }
- store.flush();
- System.out.printf("mem size %,d, maxColSize %,d, allocated %,d\n", store.memSize(), store.maxColMemSize(), store.allocatedSize());
- System.out.println(store.memUsageString());
- writeToFile(testFile, configuration, schema, pageStore, recordCount);
-
- try {
- readTestFile(testFile, configuration);
- } catch (Exception e) {
- LOG.error("failed reading", e);
- }
-
- }
-
- private static void writeField(RecordConsumer recordWriter, int index, String name, Object value) {
- if (value != null) {
- recordWriter.startField(name, index);
- if (value instanceof Integer) {
- recordWriter.addInteger((Integer)value);
- } else if (value instanceof String) {
- recordWriter.addBinary(Binary.fromString((String)value));
- } else if (value instanceof Double) {
- recordWriter.addDouble((Double)value);
- } else {
- throw new IllegalArgumentException(value.getClass().getName() + " not supported");
- }
- recordWriter.endField(name, index);
- }
- }
-}