You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/12/04 22:16:25 UTC

[2/3] incubator-parquet-mr git commit: PARQUET-117: implement the new page format for Parquet 2.0

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index 04d3eeb..e6e0cc5 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -20,7 +20,6 @@ import static parquet.Log.DEBUG;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 
 import parquet.Log;
 import parquet.Preconditions;
@@ -76,8 +75,8 @@ public class RunLengthBitPackingHybridDecoder {
     return result;
   }
 
-  private void readNext() throws IOException {	
-	Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
+  private void readNext() throws IOException {
+    Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
     final int header = BytesUtils.readUnsignedVarInt(in);
     mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
     switch (mode) {
@@ -92,7 +91,7 @@ public class RunLengthBitPackingHybridDecoder {
       if (DEBUG) LOG.debug("reading " + currentCount + " values BIT PACKED");
       currentBuffer = new int[currentCount]; // TODO: reuse a buffer
       byte[] bytes = new byte[numGroups * bitWidth];
-      // At the end of the file RLE data though, there might not be that many bytes left. 
+      // At the end of the file RLE data though, there might not be that many bytes left.
       int bytesToRead = (int)Math.ceil(currentCount * bitWidth / 8.0);
       bytesToRead = Math.min(bytesToRead, in.available());
       new DataInputStream(in).readFully(bytes, 0, bytesToRead);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
index f30d3c5..ed0ac97 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
@@ -15,12 +15,10 @@
  */
 package parquet.column.values.rle;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 import parquet.Ints;
 import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
 import parquet.column.Encoding;
 import parquet.column.values.ValuesWriter;
 import parquet.io.ParquetEncodingException;
@@ -30,11 +28,9 @@ import parquet.io.ParquetEncodingException;
  */
 public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
   private final RunLengthBitPackingHybridEncoder encoder;
-  private final ByteArrayOutputStream length;
 
   public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity) {
     this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity);
-    this.length = new ByteArrayOutputStream(4);
   }
 
   @Override
@@ -45,7 +41,7 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
       throw new ParquetEncodingException(e);
     }
   }
-  
+
   @Override
   public void writeBoolean(boolean v) {
     writeInteger(v ? 1 : 0);
@@ -66,8 +62,7 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
     try {
       // prepend the length of the column
       BytesInput rle = encoder.toBytes();
-      BytesUtils.writeIntLittleEndian(length, Ints.checkedCast(rle.size()));
-      return BytesInput.concat(BytesInput.from(length.toByteArray()), rle);
+      return BytesInput.concat(BytesInput.fromInt(Ints.checkedCast(rle.size())), rle);
     } catch (IOException e) {
       throw new ParquetEncodingException(e);
     }
@@ -81,7 +76,6 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
   @Override
   public void reset() {
     encoder.reset();
-    length.reset();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
index bc048b0..3cf664f 100644
--- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
@@ -168,9 +168,11 @@ public class MessageColumnIO extends GroupColumnIO {
     private final FieldsMarker[] fieldsWritten;
     private final int[] r;
     private final ColumnWriter[] columnWriter;
+    private final ColumnWriteStore columns;
     private boolean emptyField = true;
 
     public MessageColumnIORecordConsumer(ColumnWriteStore columns) {
+      this.columns = columns;
       int maxDepth = 0;
       this.columnWriter = new ColumnWriter[MessageColumnIO.this.getLeaves().size()];
       for (PrimitiveColumnIO primitiveColumnIO : MessageColumnIO.this.getLeaves()) {
@@ -214,6 +216,7 @@ public class MessageColumnIO extends GroupColumnIO {
     @Override
     public void endMessage() {
       writeNullForMissingFieldsAtCurrentLevel();
+      columns.endRecord();
       if (DEBUG) log("< MESSAGE END >");
       if (DEBUG) printState();
     }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
new file mode 100644
index 0000000..325bf43
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
@@ -0,0 +1,105 @@
+package parquet.column.impl;
+
+import static junit.framework.Assert.assertEquals;
+import static parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnReader;
+import parquet.column.ParquetProperties;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV2;
+import parquet.column.page.mem.MemPageReader;
+import parquet.column.page.mem.MemPageWriter;
+import parquet.io.api.Binary;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+public class TestColumnReaderImpl {
+
+  private int rows = 13001;
+
+  private static final class ValidatingConverter extends PrimitiveConverter {
+    int count;
+
+    @Override
+    public void addBinary(Binary value) {
+      assertEquals("bar" + count % 10, value.toStringUsingUTF8());
+      ++ count;
+    }
+  }
+
+  @Test
+  public void test() {
+    MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
+    ColumnDescriptor col = schema.getColumns().get(0);
+    MemPageWriter pageWriter = new MemPageWriter();
+    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true));
+    for (int i = 0; i < rows; i++) {
+      columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
+      if ((i + 1) % 1000 == 0) {
+        columnWriterV2.writePage(i);
+      }
+    }
+    columnWriterV2.writePage(rows);
+    columnWriterV2.finalizeColumnChunk();
+    List<DataPage> pages = pageWriter.getPages();
+    int valueCount = 0;
+    int rowCount = 0;
+    for (DataPage dataPage : pages) {
+      valueCount += dataPage.getValueCount();
+      rowCount += ((DataPageV2)dataPage).getRowCount();
+    }
+    assertEquals(rows, rowCount);
+    assertEquals(rows, valueCount);
+    MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
+    ValidatingConverter converter = new ValidatingConverter();
+    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
+    for (int i = 0; i < rows; i++) {
+      assertEquals(0, columnReader.getCurrentRepetitionLevel());
+      assertEquals(0, columnReader.getCurrentDefinitionLevel());
+      columnReader.writeCurrentValueToConverter();
+      columnReader.consume();
+    }
+    assertEquals(rows, converter.count);
+  }
+
+  @Test
+  public void testOptional() {
+    MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
+    ColumnDescriptor col = schema.getColumns().get(0);
+    MemPageWriter pageWriter = new MemPageWriter();
+    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true));
+    for (int i = 0; i < rows; i++) {
+      columnWriterV2.writeNull(0, 0);
+      if ((i + 1) % 1000 == 0) {
+        columnWriterV2.writePage(i);
+      }
+    }
+    columnWriterV2.writePage(rows);
+    columnWriterV2.finalizeColumnChunk();
+    List<DataPage> pages = pageWriter.getPages();
+    int valueCount = 0;
+    int rowCount = 0;
+    for (DataPage dataPage : pages) {
+      valueCount += dataPage.getValueCount();
+      rowCount += ((DataPageV2)dataPage).getRowCount();
+    }
+    assertEquals(rows, rowCount);
+    assertEquals(rows, valueCount);
+    MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
+    ValidatingConverter converter = new ValidatingConverter();
+    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
+    for (int i = 0; i < rows; i++) {
+      assertEquals(0, columnReader.getCurrentRepetitionLevel());
+      assertEquals(0, columnReader.getCurrentDefinitionLevel());
+      columnReader.consume();
+    }
+    assertEquals(0, converter.count);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java
index 46b3b8f..a386bbb 100644
--- a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java
@@ -23,17 +23,15 @@ import parquet.Log;
 import parquet.column.ColumnDescriptor;
 import parquet.column.ColumnReader;
 import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties;
 import parquet.column.ParquetProperties.WriterVersion;
 import parquet.column.impl.ColumnReadStoreImpl;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
 import parquet.column.page.mem.MemPageStore;
 import parquet.example.DummyRecordConverter;
 import parquet.io.api.Binary;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
-
 public class TestMemColumn {
   private static final Log LOG = Log.getLog(TestMemColumn.class);
 
@@ -42,9 +40,10 @@ public class TestMemColumn {
     MessageType schema = MessageTypeParser.parseMessageType("message msg { required group foo { required int64 bar; } }");
     ColumnDescriptor path = schema.getColumnDescription(new String[] {"foo", "bar"});
     MemPageStore memPageStore = new MemPageStore(10);
-    ColumnWriter columnWriter = getColumnWriter(path, memPageStore);
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     columnWriter.write(42l, 0, 0);
-    columnWriter.flush();
+    memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, schema);
     for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
@@ -56,7 +55,7 @@ public class TestMemColumn {
   }
 
   private ColumnWriter getColumnWriter(ColumnDescriptor path, MemPageStore memPageStore) {
-    ColumnWriteStoreImpl memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
     ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     return columnWriter;
   }
@@ -75,13 +74,13 @@ public class TestMemColumn {
     String[] col = new String[]{"foo", "bar"};
     MemPageStore memPageStore = new MemPageStore(10);
 
-    ColumnWriteStoreImpl memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
     ColumnDescriptor path1 = mt.getColumnDescription(col);
     ColumnDescriptor path = path1;
 
     ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     columnWriter.write(Binary.fromString("42"), 0, 0);
-    columnWriter.flush();
+    memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
     for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
@@ -97,7 +96,7 @@ public class TestMemColumn {
     MessageType mt = MessageTypeParser.parseMessageType("message msg { required group foo { required int64 bar; } }");
     String[] col = new String[]{"foo", "bar"};
     MemPageStore memPageStore = new MemPageStore(10);
-    ColumnWriteStoreImpl memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
     ColumnDescriptor path1 = mt.getColumnDescription(col);
     ColumnDescriptor path = path1;
 
@@ -105,7 +104,7 @@ public class TestMemColumn {
     for (int i = 0; i < 2000; i++) {
       columnWriter.write(42l, 0, 0);
     }
-    columnWriter.flush();
+    memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
     for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
@@ -121,7 +120,7 @@ public class TestMemColumn {
     MessageType mt = MessageTypeParser.parseMessageType("message msg { repeated group foo { repeated int64 bar; } }");
     String[] col = new String[]{"foo", "bar"};
     MemPageStore memPageStore = new MemPageStore(10);
-    ColumnWriteStoreImpl memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
     ColumnDescriptor path1 = mt.getColumnDescription(col);
     ColumnDescriptor path = path1;
 
@@ -138,7 +137,7 @@ public class TestMemColumn {
         columnWriter.writeNull(r, d);
       }
     }
-    columnWriter.flush();
+    memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
     int i = 0;
@@ -156,7 +155,7 @@ public class TestMemColumn {
     }
   }
 
-  private ColumnWriteStoreImpl newColumnWriteStoreImpl(MemPageStore memPageStore) {
-    return new ColumnWriteStoreImpl(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0);
+  private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
+    return new ColumnWriteStoreV1(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java b/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java
index f33a531..3c0acd3 100644
--- a/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java
+++ b/parquet-column/src/test/java/parquet/column/mem/TestMemPageStore.java
@@ -23,7 +23,7 @@ import org.junit.Test;
 
 import parquet.bytes.BytesInput;
 import parquet.column.ColumnDescriptor;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
 import parquet.column.page.PageReader;
 import parquet.column.page.PageWriter;
 import parquet.column.page.mem.MemPageStore;
@@ -49,7 +49,7 @@ public class TestMemPageStore {
     System.out.println(totalValueCount);
     int total = 0;
     do {
-      Page readPage = pageReader.readPage();
+      DataPage readPage = pageReader.readPage();
       total += readPage.getValueCount();
       System.out.println(readPage);
       // TODO: assert

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java b/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java
index e6a5d7a..523d87c 100644
--- a/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java
+++ b/parquet-column/src/test/java/parquet/column/page/mem/MemPageReader.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
 
 import parquet.Log;
 import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
 import parquet.column.page.PageReader;
 import parquet.io.ParquetDecodingException;
 
@@ -31,10 +31,10 @@ public class MemPageReader implements PageReader {
   private static final Log LOG = Log.getLog(MemPageReader.class);
 
   private final long totalValueCount;
-  private final Iterator<Page> pages;
+  private final Iterator<DataPage> pages;
   private final DictionaryPage dictionaryPage;
 
-  public MemPageReader(long totalValueCount, Iterator<Page> pages, DictionaryPage dictionaryPage) {
+  public MemPageReader(long totalValueCount, Iterator<DataPage> pages, DictionaryPage dictionaryPage) {
     super();
     checkNotNull(pages, "pages");
     this.totalValueCount = totalValueCount;
@@ -48,9 +48,9 @@ public class MemPageReader implements PageReader {
   }
 
   @Override
-  public Page readPage() {
+  public DataPage readPage() {
     if (pages.hasNext()) {
-      Page next = pages.next();
+      DataPage next = pages.next();
       if (DEBUG) LOG.debug("read page " + next);
       return next;
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java b/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java
index c3a9fd0..6facf34 100644
--- a/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java
+++ b/parquet-column/src/test/java/parquet/column/page/mem/MemPageStore.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import parquet.Log;
 import parquet.column.ColumnDescriptor;
 import parquet.column.UnknownColumnException;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
 import parquet.column.page.PageReadStore;
 import parquet.column.page.PageReader;
 import parquet.column.page.PageWriteStore;
@@ -58,7 +58,7 @@ public class MemPageStore implements PageReadStore, PageWriteStore {
     if (pageWriter == null) {
       throw new UnknownColumnException(descriptor);
     }
-    List<Page> pages = new ArrayList<Page>(pageWriter.getPages());
+    List<DataPage> pages = new ArrayList<DataPage>(pageWriter.getPages());
     if (Log.DEBUG) LOG.debug("initialize page reader with "+ pageWriter.getTotalValueCount() + " values and " + pages.size() + " pages");
     return new MemPageReader(pageWriter.getTotalValueCount(), pages.iterator(), pageWriter.getDictionaryPage());
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java
index 01b0873..c70e023 100644
--- a/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java
+++ b/parquet-column/src/test/java/parquet/column/page/mem/MemPageWriter.java
@@ -16,52 +16,57 @@
 package parquet.column.page.mem;
 
 import static parquet.Log.DEBUG;
+import static parquet.bytes.BytesInput.copy;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import parquet.Log;
 import parquet.bytes.BytesInput;
 import parquet.column.Encoding;
+import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
 import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
 import parquet.column.page.PageWriter;
 import parquet.column.statistics.Statistics;
 import parquet.io.ParquetEncodingException;
 
-
 public class MemPageWriter implements PageWriter {
   private static final Log LOG = Log.getLog(MemPageWriter.class);
 
-  private final List<Page> pages = new ArrayList<Page>();
+  private final List<DataPage> pages = new ArrayList<DataPage>();
   private DictionaryPage dictionaryPage;
   private long memSize = 0;
   private long totalValueCount = 0;
 
-  @Deprecated
   @Override
-  public void writePage(BytesInput bytesInput, int valueCount, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
+  public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
       throws IOException {
     if (valueCount == 0) {
       throw new ParquetEncodingException("illegal page of 0 values");
     }
     memSize += bytesInput.size();
-    pages.add(new Page(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), rlEncoding, dlEncoding, valuesEncoding));
+    pages.add(new DataPageV1(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), statistics, rlEncoding, dlEncoding, valuesEncoding));
     totalValueCount += valueCount;
     if (DEBUG) LOG.debug("page written for " + bytesInput.size() + " bytes and " + valueCount + " records");
   }
 
   @Override
-  public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
-      throws IOException {
+  public void writePageV2(int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data, Statistics<?> statistics) throws IOException {
     if (valueCount == 0) {
       throw new ParquetEncodingException("illegal page of 0 values");
     }
-    memSize += bytesInput.size();
-    pages.add(new Page(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), statistics, rlEncoding, dlEncoding, valuesEncoding));
+    long size = repetitionLevels.size() + definitionLevels.size() + data.size();
+    memSize += size;
+    pages.add(DataPageV2.uncompressed(rowCount, nullCount, valueCount, copy(repetitionLevels), copy(definitionLevels), dataEncoding, copy(data), statistics));
     totalValueCount += valueCount;
-    if (DEBUG) LOG.debug("page written for " + bytesInput.size() + " bytes and " + valueCount + " records");
+    if (DEBUG) LOG.debug("page written for " + size + " bytes and " + valueCount + " records");
+
   }
 
   @Override
@@ -69,7 +74,7 @@ public class MemPageWriter implements PageWriter {
     return memSize;
   }
 
-  public List<Page> getPages() {
+  public List<DataPage> getPages() {
     return pages;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/io/PerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/PerfTest.java b/parquet-column/src/test/java/parquet/io/PerfTest.java
index da46b51..9cd31e3 100644
--- a/parquet-column/src/test/java/parquet/io/PerfTest.java
+++ b/parquet-column/src/test/java/parquet/io/PerfTest.java
@@ -25,7 +25,7 @@ import java.util.logging.Level;
 
 import parquet.Log;
 import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
 import parquet.column.page.mem.MemPageStore;
 import parquet.example.DummyRecordConverter;
 import parquet.example.data.GroupWriter;
@@ -74,7 +74,7 @@ public class PerfTest {
 
 
   private static void write(MemPageStore memPageStore) {
-    ColumnWriteStoreImpl columns = new ColumnWriteStoreImpl(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
     MessageColumnIO columnIO = newColumnFactory(schema);
 
     GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
@@ -90,7 +90,7 @@ public class PerfTest {
     write(memPageStore, groupWriter, 1000000);
     columns.flush();
     System.out.println();
-    System.out.println(columns.memSize()+" bytes used total");
+    System.out.println(columns.getBufferedSize() + " bytes used total");
     System.out.println("max col size: "+columns.maxColMemSize()+" bytes");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/io/TestColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/parquet/io/TestColumnIO.java
index dddbcca..d4442df 100644
--- a/parquet-column/src/test/java/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/parquet/io/TestColumnIO.java
@@ -34,20 +34,18 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
-
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
 import parquet.Log;
 import parquet.column.ColumnDescriptor;
 import parquet.column.ColumnWriteStore;
 import parquet.column.ColumnWriter;
 import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.statistics.Statistics;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
 import parquet.column.page.PageReadStore;
 import parquet.column.page.mem.MemPageStore;
 import parquet.example.data.Group;
@@ -285,7 +283,7 @@ public class TestColumnIO {
 
   private void writeGroups(MessageType writtenSchema, MemPageStore memPageStore, Group... groups) {
     ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
-    ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
     MessageColumnIO columnIO = columnIOFactory.getColumnIO(writtenSchema);
     GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), writtenSchema);
     for (Group group : groups) {
@@ -303,7 +301,7 @@ public class TestColumnIO {
     log(r2);
 
     MemPageStore memPageStore = new MemPageStore(2);
-    ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
 
     ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
     {
@@ -453,7 +451,7 @@ public class TestColumnIO {
 
   private void testSchema(MessageType messageSchema, List<Group> groups) {
     MemPageStore memPageStore = new MemPageStore(groups.size());
-    ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
 
     ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
     MessageColumnIO columnIO = columnIOFactory.getColumnIO(messageSchema);
@@ -505,7 +503,7 @@ public class TestColumnIO {
   @Test
   public void testPushParser() {
     MemPageStore memPageStore = new MemPageStore(1);
-    ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
     MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
     new GroupWriter(columnIO.getRecordWriter(columns), schema).write(r1);
     columns.flush();
@@ -515,14 +513,14 @@ public class TestColumnIO {
 
   }
 
-  private ColumnWriteStoreImpl newColumnWriteStore(MemPageStore memPageStore) {
-    return new ColumnWriteStoreImpl(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
+  private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
+    return new ColumnWriteStoreV1(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
   }
 
   @Test
   public void testEmptyField() {
     MemPageStore memPageStore = new MemPageStore(1);
-    ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
     MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
     final RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
     recordWriter.startMessage();
@@ -579,77 +577,95 @@ public class TestColumnIO {
         "[Name, Url]: http://C, r:0, d:2",
         "[Name, Language, Code]: null, r:0, d:1",
         "[Name, Language, Country]: null, r:0, d:1"
-
     };
 
-    ColumnWriteStore columns = new ColumnWriteStore() {
-      int counter = 0;
+    ValidatingColumnWriteStore columns = new ValidatingColumnWriteStore(expected);
+    MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+    GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+    groupWriter.write(r1);
+    groupWriter.write(r2);
+    columns.validate();
+  }
+}
+final class ValidatingColumnWriteStore implements ColumnWriteStore {
+  private final String[] expected;
+  int counter = 0;
+
+  ValidatingColumnWriteStore(String[] expected) {
+    this.expected = expected;
+  }
+
+  @Override
+  public ColumnWriter getColumnWriter(final ColumnDescriptor path) {
+    return new ColumnWriter() {
+      private void validate(Object value, int repetitionLevel,
+          int definitionLevel) {
+        String actual = Arrays.toString(path.getPath())+": "+value+", r:"+repetitionLevel+", d:"+definitionLevel;
+        assertEquals("event #" + counter, expected[counter], actual);
+        ++ counter;
+      }
+
+      @Override
+      public void writeNull(int repetitionLevel, int definitionLevel) {
+        validate(null, repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(Binary value, int repetitionLevel, int definitionLevel) {
+        validate(value.toStringUsingUTF8(), repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(boolean value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(int value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(long value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
 
       @Override
-      public ColumnWriter getColumnWriter(final ColumnDescriptor path) {
-        return new ColumnWriter() {
-          private void validate(Object value, int repetitionLevel,
-              int definitionLevel) {
-            String actual = Arrays.toString(path.getPath())+": "+value+", r:"+repetitionLevel+", d:"+definitionLevel;
-            assertEquals("event #" + counter, expected[counter], actual);
-            ++ counter;
-          }
-
-          @Override
-          public void writeNull(int repetitionLevel, int definitionLevel) {
-            validate(null, repetitionLevel, definitionLevel);
-          }
-
-          @Override
-          public void write(Binary value, int repetitionLevel, int definitionLevel) {
-            validate(value.toStringUsingUTF8(), repetitionLevel, definitionLevel);
-          }
-
-          @Override
-          public void write(boolean value, int repetitionLevel, int definitionLevel) {
-            validate(value, repetitionLevel, definitionLevel);
-          }
-
-          @Override
-          public void write(int value, int repetitionLevel, int definitionLevel) {
-            validate(value, repetitionLevel, definitionLevel);
-          }
-
-          @Override
-          public void write(long value, int repetitionLevel, int definitionLevel) {
-            validate(value, repetitionLevel, definitionLevel);
-          }
-
-          @Override
-          public void write(float value, int repetitionLevel, int definitionLevel) {
-            validate(value, repetitionLevel, definitionLevel);
-          }
-
-          @Override
-          public void write(double value, int repetitionLevel, int definitionLevel) {
-            validate(value, repetitionLevel, definitionLevel);
-          }
-
-          @Override
-          public void flush() {
-            throw new UnsupportedOperationException();
-          }
-
-          @Override
-          public long getBufferedSizeInMemory() {
-            throw new UnsupportedOperationException();
-          }
-        };
+      public void write(float value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
       }
+
       @Override
-      public void flush() {
-        assertEquals("read all events", expected.length, counter);
+      public void write(double value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
       }
     };
-    MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-    GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
-    groupWriter.write(r1);
-    groupWriter.write(r2);
-    columns.flush();
+  }
+
+  public void validate() {
+    assertEquals("read all events", expected.length, counter);
+  }
+
+  @Override
+  public void endRecord() {
+  }
+
+  @Override
+  public void flush() {
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return 0;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    return 0;
+  }
+
+  @Override
+  public String memUsageString() {
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/test/java/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java
index 0107b36..7acf6f1 100644
--- a/parquet-column/src/test/java/parquet/io/TestFiltered.java
+++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java
@@ -21,7 +21,7 @@ import java.util.List;
 import org.junit.Test;
 
 import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
 import parquet.column.page.mem.MemPageStore;
 import parquet.example.data.Group;
 import parquet.example.data.GroupWriter;
@@ -254,7 +254,7 @@ public class TestFiltered {
 
   private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) {
     MemPageStore memPageStore = new MemPageStore(number * 2);
-    ColumnWriteStoreImpl columns = new ColumnWriteStoreImpl(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0);
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0);
 
     GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
     for ( int i = 0; i < number; i++ ) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
index b134264..198b654 100644
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
@@ -15,6 +15,9 @@
  */
 package parquet.format.converter;
 
+import static parquet.format.Util.readFileMetaData;
+import static parquet.format.Util.writePageHeader;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -29,13 +32,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.hadoop.io.UTF8;
 import parquet.Log;
 import parquet.common.schema.ColumnPath;
 import parquet.format.ColumnChunk;
 import parquet.format.ColumnMetaData;
 import parquet.format.ConvertedType;
 import parquet.format.DataPageHeader;
+import parquet.format.DataPageHeaderV2;
 import parquet.format.DictionaryPageHeader;
 import parquet.format.Encoding;
 import parquet.format.FieldRepetitionType;
@@ -60,9 +63,6 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type.Repetition;
 import parquet.schema.TypeVisitor;
 import parquet.schema.Types;
-import static java.lang.Math.min;
-import static parquet.format.Util.readFileMetaData;
-import static parquet.format.Util.writePageHeader;
 
 public class ParquetMetadataConverter {
   private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);
@@ -671,14 +671,49 @@ public class ParquetMetadataConverter {
       parquet.column.Encoding valuesEncoding) {
     PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
     // TODO: pageHeader.crc = ...;
-    pageHeader.data_page_header = new DataPageHeader(
+    pageHeader.setData_page_header(new DataPageHeader(
         valueCount,
         getEncoding(valuesEncoding),
         getEncoding(dlEncoding),
-        getEncoding(rlEncoding));
+        getEncoding(rlEncoding)));
+    if (!statistics.isEmpty()) {
+      pageHeader.getData_page_header().setStatistics(toParquetStatistics(statistics));
+    }
+    return pageHeader;
+  }
+
+  public void writeDataPageV2Header(
+      int uncompressedSize, int compressedSize,
+      int valueCount, int nullCount, int rowCount,
+      parquet.column.statistics.Statistics statistics,
+      parquet.column.Encoding dataEncoding,
+      int rlByteLength, int dlByteLength,
+      OutputStream to) throws IOException {
+    writePageHeader(
+        newDataPageV2Header(
+            uncompressedSize, compressedSize,
+            valueCount, nullCount, rowCount,
+            statistics,
+            dataEncoding,
+            rlByteLength, dlByteLength), to);
+  }
+
+  private PageHeader newDataPageV2Header(
+      int uncompressedSize, int compressedSize,
+      int valueCount, int nullCount, int rowCount,
+      parquet.column.statistics.Statistics<?> statistics,
+      parquet.column.Encoding dataEncoding,
+      int rlByteLength, int dlByteLength) {
+    // TODO: pageHeader.crc = ...;
+    DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
+        valueCount, nullCount, rowCount,
+        getEncoding(dataEncoding),
+        dlByteLength, rlByteLength);
     if (!statistics.isEmpty()) {
-      pageHeader.data_page_header.setStatistics(toParquetStatistics(statistics));
+      dataPageHeaderV2.setStatistics(toParquetStatistics(statistics));
     }
+    PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
+    pageHeader.setData_page_header_v2(dataPageHeaderV2);
     return pageHeader;
   }
 
@@ -686,7 +721,7 @@ public class ParquetMetadataConverter {
       int uncompressedSize, int compressedSize, int valueCount,
       parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
     PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
-    pageHeader.dictionary_page_header = new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding));
+    pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
     writePageHeader(pageHeader, to);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
index 2910b9f..b6809a4 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -21,13 +21,17 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import parquet.Ints;
 import parquet.Log;
 import parquet.column.ColumnDescriptor;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
 import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
 import parquet.column.page.PageReadStore;
 import parquet.column.page.PageReader;
 import parquet.hadoop.CodecFactory.BytesDecompressor;
+import parquet.io.ParquetDecodingException;
 
 /**
  * TODO: should this actually be called RowGroupImpl or something?
@@ -49,15 +53,15 @@ class ColumnChunkPageReadStore implements PageReadStore {
 
     private final BytesDecompressor decompressor;
     private final long valueCount;
-    private final List<Page> compressedPages;
+    private final List<DataPage> compressedPages;
     private final DictionaryPage compressedDictionaryPage;
 
-    ColumnChunkPageReader(BytesDecompressor decompressor, List<Page> compressedPages, DictionaryPage compressedDictionaryPage) {
+    ColumnChunkPageReader(BytesDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
       this.decompressor = decompressor;
-      this.compressedPages = new LinkedList<Page>(compressedPages);
+      this.compressedPages = new LinkedList<DataPage>(compressedPages);
       this.compressedDictionaryPage = compressedDictionaryPage;
       int count = 0;
-      for (Page p : compressedPages) {
+      for (DataPage p : compressedPages) {
         count += p.getValueCount();
       }
       this.valueCount = count;
@@ -69,23 +73,53 @@ class ColumnChunkPageReadStore implements PageReadStore {
     }
 
     @Override
-    public Page readPage() {
+    public DataPage readPage() {
       if (compressedPages.isEmpty()) {
         return null;
       }
-      Page compressedPage = compressedPages.remove(0);
-      try {
-        return new Page(
-            decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()),
-            compressedPage.getValueCount(),
-            compressedPage.getUncompressedSize(),
-            compressedPage.getStatistics(),
-            compressedPage.getRlEncoding(),
-            compressedPage.getDlEncoding(),
-            compressedPage.getValueEncoding());
-      } catch (IOException e) {
-        throw new RuntimeException(e); // TODO: cleanup
-      }
+      DataPage compressedPage = compressedPages.remove(0);
+      return compressedPage.accept(new DataPage.Visitor<DataPage>() {
+        @Override
+        public DataPage visit(DataPageV1 dataPageV1) {
+          try {
+            return new DataPageV1(
+                decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()),
+                dataPageV1.getValueCount(),
+                dataPageV1.getUncompressedSize(),
+                dataPageV1.getStatistics(),
+                dataPageV1.getRlEncoding(),
+                dataPageV1.getDlEncoding(),
+                dataPageV1.getValueEncoding());
+          } catch (IOException e) {
+            throw new ParquetDecodingException("could not decompress page", e);
+          }
+        }
+
+        @Override
+        public DataPage visit(DataPageV2 dataPageV2) {
+          if (!dataPageV2.isCompressed()) {
+            return dataPageV2;
+          }
+          try {
+            int uncompressedSize = Ints.checkedCast(
+                dataPageV2.getUncompressedSize()
+                - dataPageV2.getDefinitionLevels().size()
+                - dataPageV2.getRepetitionLevels().size());
+            return DataPageV2.uncompressed(
+                dataPageV2.getRowCount(),
+                dataPageV2.getNullCount(),
+                dataPageV2.getValueCount(),
+                dataPageV2.getRepetitionLevels(),
+                dataPageV2.getDefinitionLevels(),
+                dataPageV2.getDataEncoding(),
+                decompressor.decompress(dataPageV2.getData(), uncompressedSize),
+                dataPageV2.getStatistics()
+                );
+          } catch (IOException e) {
+            throw new ParquetDecodingException("could not decompress page", e);
+          }
+        }
+      });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
index 6d7f685..64fb7cd 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -34,7 +33,6 @@ import parquet.column.page.DictionaryPage;
 import parquet.column.page.PageWriteStore;
 import parquet.column.page.PageWriter;
 import parquet.column.statistics.Statistics;
-import parquet.column.statistics.BooleanStatistics;
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactory.BytesCompressor;
 import parquet.io.ParquetEncodingException;
@@ -69,10 +67,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
     }
 
-    @Deprecated
     @Override
     public void writePage(BytesInput bytes,
                           int valueCount,
+                          Statistics statistics,
                           Encoding rlEncoding,
                           Encoding dlEncoding,
                           Encoding valuesEncoding) throws IOException {
@@ -80,16 +78,15 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       if (uncompressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
             "Cannot write page larger than Integer.MAX_VALUE bytes: " +
-                uncompressedSize);
+            uncompressedSize);
       }
       BytesInput compressedBytes = compressor.compress(bytes);
       long compressedSize = compressedBytes.size();
       if (compressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
             "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
-                + compressedSize);
+            + compressedSize);
       }
-      BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object
       parquetMetadataConverter.writeDataPageHeader(
           (int)uncompressedSize,
           (int)compressedSize,
@@ -103,6 +100,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
       compressedBytes.writeAllTo(buf);
       encodings.add(rlEncoding);
       encodings.add(dlEncoding);
@@ -110,43 +108,46 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     }
 
     @Override
-    public void writePage(BytesInput bytes,
-                          int valueCount,
-                          Statistics statistics,
-                          Encoding rlEncoding,
-                          Encoding dlEncoding,
-                          Encoding valuesEncoding) throws IOException {
-      long uncompressedSize = bytes.size();
-      if (uncompressedSize > Integer.MAX_VALUE) {
-        throw new ParquetEncodingException(
-            "Cannot write page larger than Integer.MAX_VALUE bytes: " +
-            uncompressedSize);
-      }
-      BytesInput compressedBytes = compressor.compress(bytes);
-      long compressedSize = compressedBytes.size();
-      if (compressedSize > Integer.MAX_VALUE) {
-        throw new ParquetEncodingException(
-            "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
-            + compressedSize);
-      }
-      parquetMetadataConverter.writeDataPageHeader(
-          (int)uncompressedSize,
-          (int)compressedSize,
-          valueCount,
+    public void writePageV2(
+        int rowCount, int nullCount, int valueCount,
+        BytesInput repetitionLevels, BytesInput definitionLevels,
+        Encoding dataEncoding, BytesInput data,
+        Statistics<?> statistics) throws IOException {
+      int rlByteLength = toIntWithCheck(repetitionLevels.size());
+      int dlByteLength = toIntWithCheck(definitionLevels.size());
+      int uncompressedSize = toIntWithCheck(
+          data.size() + repetitionLevels.size() + definitionLevels.size()
+      );
+      // TODO: decide if we compress
+      BytesInput compressedData = compressor.compress(data);
+      int compressedSize = toIntWithCheck(
+          compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+      );
+      parquetMetadataConverter.writeDataPageV2Header(
+          uncompressedSize, compressedSize,
+          valueCount, nullCount, rowCount,
           statistics,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
+          dataEncoding,
+          rlByteLength, dlByteLength,
           buf);
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
       this.totalStatistics.mergeStatistics(statistics);
-      compressedBytes.writeAllTo(buf);
-      encodings.add(rlEncoding);
-      encodings.add(dlEncoding);
-      encodings.add(valuesEncoding);
+      repetitionLevels.writeAllTo(buf);
+      definitionLevels.writeAllTo(buf);
+      compressedData.writeAllTo(buf);
+      encodings.add(dataEncoding);
+    }
+
+    private int toIntWithCheck(long size) {
+      if (size > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
+            size);
+      }
+      return (int)size;
     }
 
     @Override
@@ -199,28 +200,20 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
   }
 
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
-  private final MessageType schema;
-  private final BytesCompressor compressor;
-  private final int initialSize;
 
   public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
-    this.compressor = compressor;
-    this.schema = schema;
-    this.initialSize = initialSize;
+    for (ColumnDescriptor path : schema.getColumns()) {
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSize));
+    }
   }
 
   @Override
   public PageWriter getPageWriter(ColumnDescriptor path) {
-    if (!writers.containsKey(path)) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSize));
-    }
     return writers.get(path);
   }
 
   public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
-    List<ColumnDescriptor> columns = schema.getColumns();
-    for (ColumnDescriptor columnDescriptor : columns) {
-      ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
+    for (ColumnChunkPageWriter pageWriter : writers.values()) {
       pageWriter.writeToFileWriter(writer);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index d73c811..79ef5ac 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -26,8 +26,11 @@ import java.util.HashMap;
 import java.util.Map;
 
 import parquet.Log;
+import parquet.column.ColumnWriteStore;
+import parquet.column.ParquetProperties;
 import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.impl.ColumnWriteStoreV1;
+import parquet.column.impl.ColumnWriteStoreV2;
 import parquet.hadoop.CodecFactory.BytesCompressor;
 import parquet.hadoop.api.WriteSupport;
 import parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -49,17 +52,16 @@ class InternalParquetRecordWriter<T> {
   private final int rowGroupSize;
   private final int pageSize;
   private final BytesCompressor compressor;
-  private final int dictionaryPageSize;
-  private final boolean enableDictionary;
   private final boolean validating;
-  private final WriterVersion writerVersion;
+  private final ParquetProperties parquetProperties;
 
   private long recordCount = 0;
   private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
 
-  private ColumnWriteStoreImpl columnStore;
+  private ColumnWriteStore columnStore;
   private ColumnChunkPageWriteStore pageStore;
 
+
   /**
    * @param parquetFileWriter the file to write to
    * @param writeSupport the class to convert incoming records
@@ -87,10 +89,8 @@ class InternalParquetRecordWriter<T> {
     this.rowGroupSize = rowGroupSize;
     this.pageSize = pageSize;
     this.compressor = compressor;
-    this.dictionaryPageSize = dictionaryPageSize;
-    this.enableDictionary = enableDictionary;
     this.validating = validating;
-    this.writerVersion = writerVersion;
+    this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);
     initStore();
   }
 
@@ -103,7 +103,11 @@ class InternalParquetRecordWriter<T> {
     // we don't want this number to be too small either
     // ideally, slightly bigger than the page size, but not bigger than the block buffer
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
-    columnStore = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
+    columnStore = parquetProperties.newColumnWriteStore(
+        schema,
+        pageStore,
+        pageSize,
+        initialPageBufferSize);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
   }
@@ -124,7 +128,7 @@ class InternalParquetRecordWriter<T> {
 
   private void checkBlockSizeReached() throws IOException {
     if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
-      long memSize = columnStore.memSize();
+      long memSize = columnStore.getBufferedSize();
       if (memSize > rowGroupSize) {
         LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSize, recordCount));
         flushRowGroupToStore();
@@ -143,8 +147,8 @@ class InternalParquetRecordWriter<T> {
 
   private void flushRowGroupToStore()
       throws IOException {
-    LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.allocatedSize()));
-    if (columnStore.allocatedSize() > 3 * (long)rowGroupSize) {
+    LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
+    if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSize) {
       LOG.warn("Too much memory used: " + columnStore.memUsageString());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index 74d65fe..47308c5 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -19,7 +19,10 @@ import static parquet.Log.DEBUG;
 import static parquet.bytes.BytesUtils.readIntLittleEndian;
 import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
-import static parquet.hadoop.ParquetFileWriter.*;
+import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
+import static parquet.hadoop.ParquetFileWriter.MAGIC;
+import static parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
+import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
 
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
@@ -29,7 +32,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -48,15 +50,19 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.Utils;
 
 import parquet.Log;
 import parquet.bytes.BytesInput;
 import parquet.column.ColumnDescriptor;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
 import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
 import parquet.column.page.PageReadStore;
 import parquet.common.schema.ColumnPath;
+import parquet.format.DataPageHeader;
+import parquet.format.DataPageHeaderV2;
+import parquet.format.DictionaryPageHeader;
 import parquet.format.PageHeader;
 import parquet.format.Util;
 import parquet.format.converter.ParquetMetadataConverter;
@@ -81,7 +87,7 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";
 
-  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+  private static ParquetMetadataConverter converter = new ParquetMetadataConverter();
 
   /**
    * for files provided, check if there's a summary file.
@@ -423,7 +429,7 @@ public class ParquetFileReader implements Closeable {
         throw new RuntimeException("corrupted file: the footer index is not within the file");
       }
       f.seek(footerIndex);
-      return parquetMetadataConverter.readParquetMetadata(f, filter);
+      return converter.readParquetMetadata(f, filter);
     } finally {
       f.close();
     }
@@ -535,41 +541,63 @@ public class ParquetFileReader implements Closeable {
      * @return the list of pages
      */
     public ColumnChunkPageReader readAllPages() throws IOException {
-      List<Page> pagesInChunk = new ArrayList<Page>();
+      List<DataPage> pagesInChunk = new ArrayList<DataPage>();
       DictionaryPage dictionaryPage = null;
       long valuesCountReadSoFar = 0;
       while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
         PageHeader pageHeader = readPageHeader();
+        int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+        int compressedPageSize = pageHeader.getCompressed_page_size();
         switch (pageHeader.type) {
           case DICTIONARY_PAGE:
             // there is only one dictionary page per column chunk
             if (dictionaryPage != null) {
               throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
             }
-            dictionaryPage =
+          DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
+          dictionaryPage =
                 new DictionaryPage(
-                    this.readAsBytesInput(pageHeader.compressed_page_size),
-                    pageHeader.uncompressed_page_size,
-                    pageHeader.dictionary_page_header.num_values,
-                    parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+                    this.readAsBytesInput(compressedPageSize),
+                    uncompressedPageSize,
+                    dicHeader.getNum_values(),
+                    converter.getEncoding(dicHeader.getEncoding())
                     );
             break;
           case DATA_PAGE:
+            DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
             pagesInChunk.add(
-                new Page(
-                    this.readAsBytesInput(pageHeader.compressed_page_size),
-                    pageHeader.data_page_header.num_values,
-                    pageHeader.uncompressed_page_size,
-                    ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, descriptor.col.getType()),
-                    parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
-                    parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
-                    parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+                new DataPageV1(
+                    this.readAsBytesInput(compressedPageSize),
+                    dataHeaderV1.getNum_values(),
+                    uncompressedPageSize,
+                    fromParquetStatistics(dataHeaderV1.getStatistics(), descriptor.col.getType()),
+                    converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+                    converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+                    converter.getEncoding(dataHeaderV1.getEncoding())
                     ));
-            valuesCountReadSoFar += pageHeader.data_page_header.num_values;
+            valuesCountReadSoFar += dataHeaderV1.getNum_values();
+            break;
+          case DATA_PAGE_V2:
+            DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
+            int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
+            pagesInChunk.add(
+                new DataPageV2(
+                    dataHeaderV2.getNum_rows(),
+                    dataHeaderV2.getNum_nulls(),
+                    dataHeaderV2.getNum_values(),
+                    this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
+                    this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
+                    converter.getEncoding(dataHeaderV2.getEncoding()),
+                    this.readAsBytesInput(dataSize),
+                    uncompressedPageSize,
+                    fromParquetStatistics(dataHeaderV2.getStatistics(), descriptor.col.getType()),
+                    dataHeaderV2.isIs_compressed()
+                    ));
+            valuesCountReadSoFar += dataHeaderV2.getNum_values();
             break;
           default:
-            if (DEBUG) LOG.debug("skipping page of type " + pageHeader.type + " of size " + pageHeader.compressed_page_size);
-            this.skip(pageHeader.compressed_page_size);
+            if (DEBUG) LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize);
+            this.skip(compressedPageSize);
             break;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..f499d1a
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -0,0 +1,107 @@
+package parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import parquet.bytes.BytesInput;
+import parquet.bytes.LittleEndianDataInputStream;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.column.page.DataPageV2;
+import parquet.column.page.PageReadStore;
+import parquet.column.page.PageReader;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.BinaryStatistics;
+import parquet.column.statistics.Statistics;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+public class TestColumnChunkPageWriteStore {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
+    Path root = file.getParent();
+    FileSystem fs = file.getFileSystem(conf);
+    if (fs.exists(root)) {
+      fs.delete(root, true);
+    }
+    fs.mkdirs(root);
+    CodecFactory f = new CodecFactory(conf);
+    int pageSize = 1024;
+    int initialSize = 1024;
+    MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
+    ColumnDescriptor col = schema.getColumns().get(0);
+    Encoding dataEncoding = Encoding.PLAIN;
+    int valueCount = 10;
+    int d = 1;
+    int r = 2;
+    int v = 3;
+    BytesInput definitionLevels = BytesInput.fromInt(d);
+    BytesInput repetitionLevels = BytesInput.fromInt(r);
+    Statistics<?> statistics = new BinaryStatistics();
+    BytesInput data = BytesInput.fromInt(v);
+    int rowCount = 5;
+    int nullCount = 1;
+    CompressionCodecName codec = GZIP;
+
+    {
+      ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
+      writer.start();
+      writer.startBlock(rowCount);
+      {
+        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize);
+        PageWriter pageWriter = store.getPageWriter(col);
+        pageWriter.writePageV2(
+            rowCount, nullCount, valueCount,
+            repetitionLevels, definitionLevels,
+            dataEncoding, data,
+            statistics);
+        store.flushToFileWriter(writer);
+      }
+      writer.endBlock();
+      writer.end(new HashMap<String, String>());
+    }
+
+    {
+      ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
+      ParquetFileReader reader = new ParquetFileReader(conf, file, footer.getBlocks(), schema.getColumns());
+      PageReadStore rowGroup = reader.readNextRowGroup();
+      PageReader pageReader = rowGroup.getPageReader(col);
+      DataPageV2 page = (DataPageV2)pageReader.readPage();
+      assertEquals(rowCount, page.getRowCount());
+      assertEquals(nullCount, page.getNullCount());
+      assertEquals(valueCount, page.getValueCount());
+      assertEquals(d, intValue(page.getDefinitionLevels()));
+      assertEquals(r, intValue(page.getRepetitionLevels()));
+      assertEquals(dataEncoding, page.getDataEncoding());
+      assertEquals(v, intValue(page.getData()));
+      assertEquals(statistics.toString(), page.getStatistics().toString());
+      reader.close();
+    }
+  }
+
+  private int intValue(BytesInput in) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    in.writeAllTo(baos);
+    LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    int i = os.readInt();
+    os.close();
+    return i;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
index c86753e..1d45469 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
@@ -42,7 +42,8 @@ import parquet.Log;
 import parquet.bytes.BytesInput;
 import parquet.column.ColumnDescriptor;
 import parquet.column.Encoding;
-import parquet.column.page.Page;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV1;
 import parquet.column.page.PageReadStore;
 import parquet.column.page.PageReader;
 import parquet.column.statistics.BinaryStatistics;
@@ -369,9 +370,9 @@ public class TestParquetFileWriter {
 
   private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException {
     PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
-    Page page = pageReader.readPage();
+    DataPage page = pageReader.readPage();
     assertEquals(values, page.getValueCount());
-    assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
+    assertArrayEquals(bytes.toByteArray(), ((DataPageV1)page).getBytes().toByteArray());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriterNewPage.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriterNewPage.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriterNewPage.java
new file mode 100644
index 0000000..23a1f13
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriterNewPage.java
@@ -0,0 +1,117 @@
+package parquet.hadoop;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static parquet.column.Encoding.DELTA_BYTE_ARRAY;
+import static parquet.column.Encoding.PLAIN;
+import static parquet.column.Encoding.PLAIN_DICTIONARY;
+import static parquet.column.Encoding.RLE_DICTIONARY;
+import static parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static parquet.hadoop.ParquetFileReader.readFooter;
+import static parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import parquet.column.Encoding;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.example.data.Group;
+import parquet.example.data.simple.SimpleGroupFactory;
+import parquet.hadoop.example.GroupReadSupport;
+import parquet.hadoop.example.GroupWriteSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.io.api.Binary;
+import parquet.schema.MessageType;
+
+public class TestParquetWriterNewPage {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path root = new Path("target/tests/TestParquetWriter/");
+    FileSystem fs = root.getFileSystem(conf);
+    if (fs.exists(root)) {
+      fs.delete(root, true);
+    }
+    fs.mkdirs(root);
+    MessageType schema = parseMessageType(
+        "message test { "
+        + "required binary binary_field; "
+        + "required int32 int32_field; "
+        + "required int64 int64_field; "
+        + "required boolean boolean_field; "
+        + "required float float_field; "
+        + "required double double_field; "
+        + "required fixed_len_byte_array(3) flba_field; "
+        + "required int96 int96_field; "
+        + "optional binary null_field; "
+        + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    Map<String, Encoding> expected = new HashMap<String, Encoding>();
+    expected.put("10-" + PARQUET_1_0, PLAIN_DICTIONARY);
+    expected.put("1000-" + PARQUET_1_0, PLAIN);
+    expected.put("10-" + PARQUET_2_0, RLE_DICTIONARY);
+    expected.put("1000-" + PARQUET_2_0, DELTA_BYTE_ARRAY);
+    for (int modulo : asList(10, 1000)) {
+      for (WriterVersion version : WriterVersion.values()) {
+        Path file = new Path(root, version.name() + "_" + modulo);
+        ParquetWriter<Group> writer = new ParquetWriter<Group>(
+            file,
+            new GroupWriteSupport(),
+            UNCOMPRESSED, 1024, 1024, 512, true, false, version, conf);
+        for (int i = 0; i < 1000; i++) {
+          writer.write(
+              f.newGroup()
+              .append("binary_field", "test" + (i % modulo))
+              .append("int32_field", 32)
+              .append("int64_field", 64l)
+              .append("boolean_field", true)
+              .append("float_field", 1.0f)
+              .append("double_field", 2.0d)
+              .append("flba_field", "foo")
+              .append("int96_field", Binary.fromByteArray(new byte[12])));
+        }
+        writer.close();
+
+        ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).withConf(conf).build();
+        for (int i = 0; i < 1000; i++) {
+          Group group = reader.read();
+          assertEquals("test" + (i % modulo), group.getBinary("binary_field", 0).toStringUsingUTF8());
+          assertEquals(32, group.getInteger("int32_field", 0));
+          assertEquals(64l, group.getLong("int64_field", 0));
+          assertEquals(true, group.getBoolean("boolean_field", 0));
+          assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
+          assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
+          assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8());
+          assertEquals(Binary.fromByteArray(new byte[12]), group.getInt96("int96_field", 0));
+          assertEquals(0, group.getFieldRepetitionCount("null_field"));
+        }
+        reader.close();
+        ParquetMetadata footer = readFooter(conf, file, NO_FILTER);
+        for (BlockMetaData blockMetaData : footer.getBlocks()) {
+          for (ColumnChunkMetaData column : blockMetaData.getColumns()) {
+            if (column.getPath().toDotString().equals("binary_field")) {
+              String key = modulo + "-" + version;
+              Encoding expectedEncoding = expected.get(key);
+              assertTrue(
+                  key + ":" + column.getEncodings() + " should contain " + expectedEncoding,
+                  column.getEncodings().contains(expectedEncoding));
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java b/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java
deleted file mode 100644
index 24d634e..0000000
--- a/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.pig;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
-import parquet.column.page.Page;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.column.page.mem.MemPageStore;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.Repetition;
-
-public class GenerateIntTestFile {
-  private static final Log LOG = Log.getLog(GenerateIntTestFile.class);
-
-  public static void main(String[] args) throws Throwable {
-    File out = new File("testdata/from_java/int_test_file");
-    if (out.exists()) {
-      if (!out.delete()) {
-        throw new RuntimeException("can not remove existing file " + out.getAbsolutePath());
-      }
-    }
-    Path testFile = new Path(out.toURI());
-    Configuration configuration = new Configuration();
-    {
-      MessageType schema = new MessageType("int_test_file", new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int_col"));
-
-      MemPageStore pageStore = new MemPageStore(100);
-      ColumnWriteStoreImpl store = new ColumnWriteStoreImpl(pageStore, 8*1024, 8*1024, 8*1024, false, WriterVersion.PARQUET_1_0);
-      MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-
-      RecordConsumer recordWriter = columnIO.getRecordWriter(store);
-
-      int recordCount = 0;
-      for (int i = 0; i < 100; i++) {
-        recordWriter.startMessage();
-        recordWriter.startField("int_col", 0);
-        if (i % 10 != 0) {
-          recordWriter.addInteger(i);
-        }
-        recordWriter.endField("int_col", 0);
-        recordWriter.endMessage();
-        ++ recordCount;
-      }
-      store.flush();
-
-
-      writeToFile(testFile, configuration, schema, pageStore, recordCount);
-    }
-
-    {
-      readTestFile(testFile, configuration);
-    }
-  }
-
-  public static void readTestFile(Path testFile, Configuration configuration)
-      throws IOException {
-    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, testFile);
-    MessageType schema = readFooter.getFileMetaData().getSchema();
-    ParquetFileReader parquetFileReader = new ParquetFileReader(configuration, testFile, readFooter.getBlocks(), schema.getColumns());
-    PageReadStore pages = parquetFileReader.readNextRowGroup();
-    System.out.println(pages.getRowCount());
-  }
-
-  public static void writeToFile(Path file, Configuration configuration, MessageType schema, MemPageStore pageStore, int recordCount)
-      throws IOException {
-    ParquetFileWriter w = startFile(file, configuration, schema);
-    writeBlock(schema, pageStore, recordCount, w);
-    endFile(w);
-  }
-
-  public static void endFile(ParquetFileWriter w) throws IOException {
-    w.end(new HashMap<String, String>());
-  }
-
-  public static void writeBlock(MessageType schema, MemPageStore pageStore,
-      int recordCount, ParquetFileWriter w) throws IOException {
-    w.startBlock(recordCount);
-    List<ColumnDescriptor> columns = schema.getColumns();
-    for (ColumnDescriptor columnDescriptor : columns) {
-      PageReader pageReader = pageStore.getPageReader(columnDescriptor);
-      long totalValueCount = pageReader.getTotalValueCount();
-      w.startColumn(columnDescriptor, totalValueCount, CompressionCodecName.UNCOMPRESSED);
-      int n = 0;
-      do {
-        Page page = pageReader.readPage();
-        n += page.getValueCount();
-        // TODO: change INTFC
-        w.writeDataPage(
-            page.getValueCount(),
-            (int)page.getBytes().size(),
-            BytesInput.from(page.getBytes().toByteArray()),
-            page.getRlEncoding(),
-            page.getDlEncoding(),
-            page.getValueEncoding());
-      } while (n < totalValueCount);
-      w.endColumn();
-    }
-    w.endBlock();
-  }
-
-  public static ParquetFileWriter startFile(Path file,
-      Configuration configuration, MessageType schema) throws IOException {
-    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, file);
-    w.start();
-    return w;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java b/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java
deleted file mode 100644
index 106dc30..0000000
--- a/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.pig;
-
-import static parquet.pig.GenerateIntTestFile.readTestFile;
-import static parquet.pig.GenerateIntTestFile.writeToFile;
-
-import java.io.File;
-import java.io.IOException;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
-import parquet.column.page.mem.MemPageStore;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.Repetition;
-
-public class GenerateTPCH {
-  private static final Log LOG = Log.getLog(GenerateTPCH.class);
-
-  public static void main(String[] args) throws IOException {
-    File out = new File("testdata/from_java/tpch/customer");
-    if (out.exists()) {
-      if (!out.delete()) {
-        throw new RuntimeException("can not remove existing file " + out.getAbsolutePath());
-      }
-    }
-    Path testFile = new Path(out.toURI());
-    Configuration configuration = new Configuration();
-    MessageType schema = new MessageType("customer",
-        new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32,  "c_custkey"),
-        new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_name"),
-        new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_address"),
-        new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32,  "c_nationkey"),
-        new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_phone"),
-        new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "c_acctbal"),
-        new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_mktsegment"),
-        new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_comment")
-        );
-
-    MemPageStore pageStore = new MemPageStore(150000);
-    ColumnWriteStoreImpl store = new ColumnWriteStoreImpl(pageStore, 20*1024, 1*1024, 20*1024, false, WriterVersion.PARQUET_1_0);
-    MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
-
-    RecordConsumer recordWriter = columnIO.getRecordWriter(store);
-
-    int recordCount = 0;
-    for (int i = 0; i < 150000; i++) {
-      recordWriter.startMessage();
-      writeField(recordWriter, 0, "c_custkey", i % 10 == 0 ? null : i);
-      writeField(recordWriter, 1, "c_name", i % 11 == 0 ? null : "name_" + i);
-      writeField(recordWriter, 2, "c_address", i % 12 == 0 ? null : "add_" + i);
-      writeField(recordWriter, 3, "c_nationkey", i % 13 == 0 ? null : i);
-      writeField(recordWriter, 4, "c_phone", i % 14 == 0 ? null : "phone_" + i);
-      writeField(recordWriter, 5, "c_acctbal", i % 15 == 0 ? null : 1.2d * i);
-      writeField(recordWriter, 6, "c_mktsegment", i % 16 == 0 ? null : "mktsegment_" + i);
-      writeField(recordWriter, 7, "c_comment", i % 17 == 0 ? null : "comment_" + i);
-      recordWriter.endMessage();
-      ++ recordCount;
-    }
-    store.flush();
-    System.out.printf("mem size %,d, maxColSize %,d, allocated %,d\n", store.memSize(), store.maxColMemSize(), store.allocatedSize());
-    System.out.println(store.memUsageString());
-    writeToFile(testFile, configuration, schema, pageStore, recordCount);
-
-    try {
-      readTestFile(testFile, configuration);
-    } catch (Exception e) {
-      LOG.error("failed reading", e);
-    }
-
-  }
-
-  private static void writeField(RecordConsumer recordWriter, int index, String name, Object value) {
-    if (value != null) {
-      recordWriter.startField(name, index);
-      if (value instanceof Integer) {
-        recordWriter.addInteger((Integer)value);
-      } else if (value instanceof String) {
-        recordWriter.addBinary(Binary.fromString((String)value));
-      } else if (value instanceof Double) {
-        recordWriter.addDouble((Double)value);
-      } else {
-        throw new IllegalArgumentException(value.getClass().getName() + " not supported");
-      }
-      recordWriter.endField(name, index);
-    }
-  }
-}