You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/12/04 22:16:26 UTC

[3/3] incubator-parquet-mr git commit: PARQUET-117: implement the new page format for Parquet 2.0

PARQUET-117: implement the new page format for Parquet 2.0

The new page format was defined some time ago:
https://github.com/Parquet/parquet-format/pull/64
https://github.com/Parquet/parquet-format/issues/44
The goals are the following:
 - cut pages on record boundaries to facilitate skipping pages in predicate poush down
 - read rl and dl independently of data
 - optionally not compress data

Author: julien <ju...@twitter.com>

Closes #75 from julienledem/new_page_format and squashes the following commits:

fbbc23a [julien] make mvn install display output only if it fails
4189383 [julien] save output lines as travis cuts after 10000
44d3684 [julien] fix parquet-tools for new page format
0fb8c15 [julien] Merge branch 'master' into new_page_format
5880cbb [julien] Merge branch 'master' into new_page_format
6ee7303 [julien] make parquet.column package not semver compliant
42f6c9f [julien] add tests and fix bugs
266302b [julien] fix write path
4e76369 [julien] read path
050a487 [julien] fix compilation
e0e9d00 [julien] better ColumnWriterStore definition
ecf04ce [julien] remove unnecessary change
2bc4d01 [julien] first stab at write path for the new page format


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/ccc29e4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/ccc29e4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/ccc29e4d

Branch: refs/heads/master
Commit: ccc29e4dde24584118211f27c71bb01bacc39326
Parents: b5f6a3b
Author: julien <ju...@twitter.com>
Authored: Thu Dec 4 13:16:11 2014 -0800
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Thu Dec 4 13:16:11 2014 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 .../java/parquet/column/ColumnWriteStore.java   |  22 ++
 .../main/java/parquet/column/ColumnWriter.java  |  12 -
 .../java/parquet/column/ParquetProperties.java  |  25 ++
 .../parquet/column/impl/ColumnReaderImpl.java   | 143 +++++++--
 .../column/impl/ColumnWriteStoreImpl.java       | 127 --------
 .../parquet/column/impl/ColumnWriteStoreV1.java | 134 +++++++++
 .../parquet/column/impl/ColumnWriteStoreV2.java | 163 ++++++++++
 .../parquet/column/impl/ColumnWriterImpl.java   | 275 -----------------
 .../parquet/column/impl/ColumnWriterV1.java     | 269 +++++++++++++++++
 .../parquet/column/impl/ColumnWriterV2.java     | 295 +++++++++++++++++++
 .../main/java/parquet/column/page/DataPage.java |  50 ++++
 .../java/parquet/column/page/DataPageV1.java    |  80 +++++
 .../java/parquet/column/page/DataPageV2.java    | 138 +++++++++
 .../parquet/column/page/DictionaryPage.java     |  14 +-
 .../src/main/java/parquet/column/page/Page.java | 136 +--------
 .../java/parquet/column/page/PageReader.java    |   2 +-
 .../java/parquet/column/page/PageWriter.java    |  34 ++-
 .../parquet/column/values/ValuesWriter.java     |   3 +-
 .../rle/RunLengthBitPackingHybridDecoder.java   |   7 +-
 .../RunLengthBitPackingHybridValuesWriter.java  |  10 +-
 .../main/java/parquet/io/MessageColumnIO.java   |   3 +
 .../column/impl/TestColumnReaderImpl.java       | 105 +++++++
 .../java/parquet/column/mem/TestMemColumn.java  |  27 +-
 .../parquet/column/mem/TestMemPageStore.java    |   4 +-
 .../parquet/column/page/mem/MemPageReader.java  |  10 +-
 .../parquet/column/page/mem/MemPageStore.java   |   4 +-
 .../parquet/column/page/mem/MemPageWriter.java  |  29 +-
 .../src/test/java/parquet/io/PerfTest.java      |   6 +-
 .../src/test/java/parquet/io/TestColumnIO.java  | 166 ++++++-----
 .../src/test/java/parquet/io/TestFiltered.java  |   4 +-
 .../converter/ParquetMetadataConverter.java     |  51 +++-
 .../hadoop/ColumnChunkPageReadStore.java        |  72 +++--
 .../hadoop/ColumnChunkPageWriteStore.java       |  89 +++---
 .../hadoop/InternalParquetRecordWriter.java     |  28 +-
 .../java/parquet/hadoop/ParquetFileReader.java  |  74 +++--
 .../hadoop/TestColumnChunkPageWriteStore.java   | 107 +++++++
 .../parquet/hadoop/TestParquetFileWriter.java   |   7 +-
 .../hadoop/TestParquetWriterNewPage.java        | 117 ++++++++
 .../java/parquet/pig/GenerateIntTestFile.java   | 142 ---------
 .../src/test/java/parquet/pig/GenerateTPCH.java | 112 -------
 .../java/parquet/pig/TupleConsumerPerfTest.java |   8 +-
 .../parquet/thrift/TestParquetReadProtocol.java |   4 +-
 .../java/parquet/tools/command/DumpCommand.java |  34 ++-
 pom.xml                                         |   2 +-
 45 files changed, 2052 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a75ac3a..ae33a7b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,7 +5,7 @@ before_install:
   - mkdir protobuf_install
   - pushd protobuf_install
   - wget http://protobuf.googlecode.com/files/protobuf-2.5.0.tar.gz
-  - tar xzvf protobuf-2.5.0.tar.gz
+  - tar xzf protobuf-2.5.0.tar.gz
   - cd  protobuf-2.5.0
   - ./configure
   - make
@@ -27,5 +27,5 @@ env:
   - HADOOP_PROFILE=default
   - HADOOP_PROFILE=hadoop-2
 
-install: mvn install -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true
+install: mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || cat mvn_install.log
 script: mvn test -P $HADOOP_PROFILE

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java
index 9abff4d..4b8121f 100644
--- a/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java
+++ b/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java
@@ -33,4 +33,26 @@ public interface ColumnWriteStore {
    */
   abstract public void flush();
 
+  /**
+   * called to notify of record boundaries
+   */
+  abstract public void endRecord();
+
+  /**
+   * used for information
+   * @return approximate size used in memory
+   */
+  abstract public long getAllocatedSize();
+
+  /**
+   * used to flush row groups to disk
+   * @return approximate size of the buffered encoded binary data
+   */
+  abstract public long getBufferedSize();
+
+  /**
+   * used for debugging pupose
+   * @return a formated string representing memory usage per column
+   */
+  abstract public String memUsageString();
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/ColumnWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/parquet/column/ColumnWriter.java
index 1fd7bba..702fe26 100644
--- a/parquet-column/src/main/java/parquet/column/ColumnWriter.java
+++ b/parquet-column/src/main/java/parquet/column/ColumnWriter.java
@@ -15,7 +15,6 @@
  */
 package parquet.column;
 
-import parquet.column.statistics.Statistics;
 import parquet.io.api.Binary;
 
 /**
@@ -81,16 +80,5 @@ public interface ColumnWriter {
    */
   void writeNull(int repetitionLevel, int definitionLevel);
 
-  /**
-   * Flushes the underlying store. This should be called when there are no
-   * remaining triplets to be written.
-   */
-  void flush();
-
-  /**
-   * used to decide when to write a page or row group
-   * @return the number of bytes of memory used to buffer the current data
-   */
-  long getBufferedSizeInMemory();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
index aea02ad..dc7774f 100644
--- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
@@ -4,6 +4,9 @@ import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
 import static parquet.column.Encoding.PLAIN;
 import static parquet.column.Encoding.PLAIN_DICTIONARY;
 import static parquet.column.Encoding.RLE_DICTIONARY;
+import parquet.column.impl.ColumnWriteStoreV1;
+import parquet.column.impl.ColumnWriteStoreV2;
+import parquet.column.page.PageWriteStore;
 import parquet.column.values.ValuesWriter;
 import parquet.column.values.boundedint.DevNullValuesWriter;
 import parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
@@ -20,6 +23,7 @@ import parquet.column.values.plain.BooleanPlainValuesWriter;
 import parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
 import parquet.column.values.plain.PlainValuesWriter;
 import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import parquet.schema.MessageType;
 
 /**
  * This class represents all the configurable Parquet properties.
@@ -195,4 +199,25 @@ public class ParquetProperties {
   public boolean isEnableDictionary() {
     return enableDictionary;
   }
+
+  public ColumnWriteStore newColumnWriteStore(
+      MessageType schema,
+      PageWriteStore pageStore, int pageSize,
+      int initialPageBufferSize) {
+    switch (writerVersion) {
+    case PARQUET_1_0:
+      return new ColumnWriteStoreV1(
+          pageStore,
+          pageSize, initialPageBufferSize, dictionaryPageSizeThreshold,
+          enableDictionary, writerVersion);
+    case PARQUET_2_0:
+      return new ColumnWriteStoreV2(
+          schema,
+          pageStore,
+          pageSize, initialPageBufferSize,
+          new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
+    default:
+      throw new IllegalArgumentException("unknown version " + writerVersion);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
index a58bfd9..dfbdc71 100644
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
@@ -18,18 +18,27 @@ package parquet.column.impl;
 import static java.lang.String.format;
 import static parquet.Log.DEBUG;
 import static parquet.Preconditions.checkNotNull;
+import static parquet.column.ValuesType.DEFINITION_LEVEL;
+import static parquet.column.ValuesType.REPETITION_LEVEL;
+import static parquet.column.ValuesType.VALUES;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
 import parquet.Log;
+import parquet.bytes.BytesInput;
+import parquet.bytes.BytesUtils;
 import parquet.column.ColumnDescriptor;
 import parquet.column.ColumnReader;
 import parquet.column.Dictionary;
-import parquet.column.ValuesType;
+import parquet.column.Encoding;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
 import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
 import parquet.column.page.PageReader;
 import parquet.column.values.ValuesReader;
+import parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
 import parquet.io.ParquetDecodingException;
 import parquet.io.api.Binary;
 import parquet.io.api.PrimitiveConverter;
@@ -95,7 +104,7 @@ class ColumnReaderImpl implements ColumnReader {
     public long getLong() {
       throw new UnsupportedOperationException();
     }
-    
+
     /**
      * @return current value
      */
@@ -123,8 +132,8 @@ class ColumnReaderImpl implements ColumnReader {
   private final PageReader pageReader;
   private final Dictionary dictionary;
 
-  private ValuesReader repetitionLevelColumn;
-  private ValuesReader definitionLevelColumn;
+  private IntIterator repetitionLevelColumn;
+  private IntIterator definitionLevelColumn;
   protected ValuesReader dataColumn;
 
   private int repetitionLevel;
@@ -478,8 +487,8 @@ class ColumnReaderImpl implements ColumnReader {
 
   // TODO: change the logic around read() to not tie together reading from the 3 columns
   private void readRepetitionAndDefinitionLevels() {
-    repetitionLevel = repetitionLevelColumn.readInteger();
-    definitionLevel = definitionLevelColumn.readInteger();
+    repetitionLevel = repetitionLevelColumn.nextInt();
+    definitionLevel = definitionLevelColumn.nextInt();
     ++readValues;
   }
 
@@ -497,42 +506,91 @@ class ColumnReaderImpl implements ColumnReader {
 
   private void readPage() {
     if (DEBUG) LOG.debug("loading page");
-    Page page = pageReader.readPage();
+    DataPage page = pageReader.readPage();
+    page.accept(new DataPage.Visitor<Void>() {
+      @Override
+      public Void visit(DataPageV1 dataPageV1) {
+        readPageV1(dataPageV1);
+        return null;
+      }
+      @Override
+      public Void visit(DataPageV2 dataPageV2) {
+        readPageV2(dataPageV2);
+        return null;
+      }
+    });
+  }
 
-    this.repetitionLevelColumn = page.getRlEncoding().getValuesReader(path, ValuesType.REPETITION_LEVEL);
-    this.definitionLevelColumn = page.getDlEncoding().getValuesReader(path, ValuesType.DEFINITION_LEVEL);
-    if (page.getValueEncoding().usesDictionary()) {
+  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
+    this.pageValueCount = valueCount;
+    this.endOfPageValueCount = readValues + pageValueCount;
+    if (dataEncoding.usesDictionary()) {
       if (dictionary == null) {
         throw new ParquetDecodingException(
-            "could not read page " + page + " in col " + path + " as the dictionary was missing for encoding " + page.getValueEncoding());
+            "could not read page in col " + path + " as the dictionary was missing for encoding " + dataEncoding);
       }
-      this.dataColumn = page.getValueEncoding().getDictionaryBasedValuesReader(path, ValuesType.VALUES, dictionary);
+      this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(path, VALUES, dictionary);
     } else {
-      this.dataColumn = page.getValueEncoding().getValuesReader(path, ValuesType.VALUES);
+      this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
     }
-    if (page.getValueEncoding().usesDictionary() && converter.hasDictionarySupport()) {
+    if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
       bindToDictionary(dictionary);
     } else {
       bind(path.getType());
     }
-    this.pageValueCount = page.getValueCount();
-    this.endOfPageValueCount = readValues + pageValueCount;
+    try {
+      dataColumn.initFromPage(pageValueCount, bytes, offset);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page in col " + path, e);
+    }
+  }
+
+  private void readPageV1(DataPageV1 page) {
+    ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
+    ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
+    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
     try {
       byte[] bytes = page.getBytes().toByteArray();
       if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
       if (DEBUG) LOG.debug("reading repetition levels at 0");
-      repetitionLevelColumn.initFromPage(pageValueCount, bytes, 0);
-      int next = repetitionLevelColumn.getNextOffset();
+      rlReader.initFromPage(pageValueCount, bytes, 0);
+      int next = rlReader.getNextOffset();
       if (DEBUG) LOG.debug("reading definition levels at " + next);
-      definitionLevelColumn.initFromPage(pageValueCount, bytes, next);
-      next = definitionLevelColumn.getNextOffset();
+      dlReader.initFromPage(pageValueCount, bytes, next);
+      next = dlReader.getNextOffset();
       if (DEBUG) LOG.debug("reading data at " + next);
-      dataColumn.initFromPage(pageValueCount, bytes, next);
+      initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
     }
   }
 
+  private void readPageV2(DataPageV2 page) {
+    this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
+    this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
+    try {
+      if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
+      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
+    }
+  }
+
+  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+    try {
+      if (maxLevel == 0) {
+        return new NullIntIterator();
+      }
+      return new RLEIntIterator(
+          new RunLengthBitPackingHybridDecoder(
+              BytesUtils.getWidthFromMaxInt(maxLevel),
+              new ByteArrayInputStream(bytes.toByteArray())));
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read levels in page for col " + path, e);
+    }
+  }
+
   private boolean isPageFullyConsumed() {
     return readValues >= endOfPageValueCount;
   }
@@ -556,4 +614,45 @@ class ColumnReaderImpl implements ColumnReader {
     return totalValueCount;
   }
 
+  static abstract class IntIterator {
+    abstract int nextInt();
+  }
+
+  static class ValuesReaderIntIterator extends IntIterator {
+    ValuesReader delegate;
+
+    public ValuesReaderIntIterator(ValuesReader delegate) {
+      super();
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      return delegate.readInteger();
+    }
+  }
+
+  static class RLEIntIterator extends IntIterator {
+    RunLengthBitPackingHybridDecoder delegate;
+
+    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      try {
+        return delegate.readInt();
+      } catch (IOException e) {
+        throw new ParquetDecodingException(e);
+      }
+    }
+  }
+
+  private static final class NullIntIterator extends IntIterator {
+    @Override
+    int nextInt() {
+      return 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreImpl.java
deleted file mode 100644
index 9d3b15c..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.column.impl;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriteStore;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-
-
-public class ColumnWriteStoreImpl implements ColumnWriteStore {
-
-  private final Map<ColumnDescriptor, ColumnWriterImpl> columns = new TreeMap<ColumnDescriptor, ColumnWriterImpl>();
-  private final PageWriteStore pageWriteStore;
-  private final int pageSizeThreshold;
-  private final int dictionaryPageSizeThreshold;
-  private final boolean enableDictionary;
-  private final int initialSizePerCol;
-  private final WriterVersion writerVersion;
-
-  public ColumnWriteStoreImpl(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
-    super();
-    this.pageWriteStore = pageWriteStore;
-    this.pageSizeThreshold = pageSizeThreshold;
-    this.initialSizePerCol = initialSizePerCol;
-    this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
-    this.enableDictionary = enableDictionary;
-    this.writerVersion = writerVersion;
-  }
-
-  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
-    ColumnWriterImpl column = columns.get(path);
-    if (column == null) {
-      column = newMemColumn(path);
-      columns.put(path, column);
-    }
-    return column;
-  }
-
-  public Set<ColumnDescriptor> getColumnDescriptors() {
-    return columns.keySet();
-  }
-
-  private ColumnWriterImpl newMemColumn(ColumnDescriptor path) {
-    PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-    return new ColumnWriterImpl(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
-  }
-
-  @Override
-  public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Entry<ColumnDescriptor, ColumnWriterImpl> entry : columns.entrySet()) {
-        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
-        sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
-        sb.append("\n");
-      }
-      return sb.toString();
-  }
-
-  public long allocatedSize() {
-    Collection<ColumnWriterImpl> values = columns.values();
-    long total = 0;
-    for (ColumnWriterImpl memColumn : values) {
-      total += memColumn.allocatedSize();
-    }
-    return total;
-  }
-
-  public long memSize() {
-    Collection<ColumnWriterImpl> values = columns.values();
-    long total = 0;
-    for (ColumnWriterImpl memColumn : values) {
-      total += memColumn.getBufferedSizeInMemory();
-    }
-    return total;
-  }
-
-  public long maxColMemSize() {
-    Collection<ColumnWriterImpl> values = columns.values();
-    long max = 0;
-    for (ColumnWriterImpl memColumn : values) {
-      max = Math.max(max, memColumn.getBufferedSizeInMemory());
-    }
-    return max;
-  }
-
-  @Override
-  public void flush() {
-    Collection<ColumnWriterImpl> values = columns.values();
-    for (ColumnWriterImpl memColumn : values) {
-      memColumn.flush();
-    }
-  }
-
-  public String memUsageString() {
-    StringBuilder b = new StringBuilder("Store {\n");
-    Collection<ColumnWriterImpl> values = columns.values();
-    for (ColumnWriterImpl memColumn : values) {
-      b.append(memColumn.memUsageString(" "));
-    }
-    b.append("}\n");
-    return b.toString();
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
new file mode 100644
index 0000000..884c665
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
@@ -0,0 +1,134 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnWriteStore;
+import parquet.column.ColumnWriter;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.column.page.PageWriteStore;
+import parquet.column.page.PageWriter;
+
+public class ColumnWriteStoreV1 implements ColumnWriteStore {
+
+  private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
+  private final PageWriteStore pageWriteStore;
+  private final int pageSizeThreshold;
+  private final int dictionaryPageSizeThreshold;
+  private final boolean enableDictionary;
+  private final int initialSizePerCol;
+  private final WriterVersion writerVersion;
+
+  public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
+    super();
+    this.pageWriteStore = pageWriteStore;
+    this.pageSizeThreshold = pageSizeThreshold;
+    this.initialSizePerCol = initialSizePerCol;
+    this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
+    this.enableDictionary = enableDictionary;
+    this.writerVersion = writerVersion;
+  }
+
+  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+    ColumnWriterV1 column = columns.get(path);
+    if (column == null) {
+      column = newMemColumn(path);
+      columns.put(path, column);
+    }
+    return column;
+  }
+
+  public Set<ColumnDescriptor> getColumnDescriptors() {
+    return columns.keySet();
+  }
+
+  private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
+    PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+    return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
+  }
+
+  @Override
+  public String toString() {
+      StringBuilder sb = new StringBuilder();
+      for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
+        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+        sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
+        sb.append("\n");
+      }
+      return sb.toString();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    Collection<ColumnWriterV1> values = columns.values();
+    long total = 0;
+    for (ColumnWriterV1 memColumn : values) {
+      total += memColumn.allocatedSize();
+    }
+    return total;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    Collection<ColumnWriterV1> values = columns.values();
+    long total = 0;
+    for (ColumnWriterV1 memColumn : values) {
+      total += memColumn.getBufferedSizeInMemory();
+    }
+    return total;
+  }
+
+  @Override
+  public String memUsageString() {
+    StringBuilder b = new StringBuilder("Store {\n");
+    Collection<ColumnWriterV1> values = columns.values();
+    for (ColumnWriterV1 memColumn : values) {
+      b.append(memColumn.memUsageString(" "));
+    }
+    b.append("}\n");
+    return b.toString();
+  }
+
+  public long maxColMemSize() {
+    Collection<ColumnWriterV1> values = columns.values();
+    long max = 0;
+    for (ColumnWriterV1 memColumn : values) {
+      max = Math.max(max, memColumn.getBufferedSizeInMemory());
+    }
+    return max;
+  }
+
+  @Override
+  public void flush() {
+    Collection<ColumnWriterV1> values = columns.values();
+    for (ColumnWriterV1 memColumn : values) {
+      memColumn.flush();
+    }
+  }
+
+  @Override
+  public void endRecord() {
+    // V1 does not take record boundaries into account
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
new file mode 100644
index 0000000..ba6edf3
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.impl;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnWriteStore;
+import parquet.column.ColumnWriter;
+import parquet.column.ParquetProperties;
+import parquet.column.page.PageWriteStore;
+import parquet.column.page.PageWriter;
+import parquet.schema.MessageType;
+
+public class ColumnWriteStoreV2 implements ColumnWriteStore {
+
+  // will wait for at least that many records before checking again
+  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+  // will flush even if size bellow the threshold by this much to facilitate page alignment
+  private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
+
+  private final Map<ColumnDescriptor, ColumnWriterV2> columns;
+  private final Collection<ColumnWriterV2> writers;
+  private long rowCount;
+  private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+  private final long thresholdTolerance;
+
+  private int pageSizeThreshold;
+
+  public ColumnWriteStoreV2(
+      MessageType schema,
+      PageWriteStore pageWriteStore,
+      int pageSizeThreshold, int initialSizePerCol,
+      ParquetProperties parquetProps) {
+    super();
+    this.pageSizeThreshold = pageSizeThreshold;
+    this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
+    Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
+    for (ColumnDescriptor path : schema.getColumns()) {
+      PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+      mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps));
+    }
+    this.columns = unmodifiableMap(mcolumns);
+    this.writers = this.columns.values();
+  }
+
+  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+    return columns.get(path);
+  }
+
+  public Set<ColumnDescriptor> getColumnDescriptors() {
+    return columns.keySet();
+  }
+
+  @Override
+  public String toString() {
+      StringBuilder sb = new StringBuilder();
+      for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
+        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+        sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
+        sb.append("\n");
+      }
+      return sb.toString();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    long total = 0;
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      total += memColumn.allocatedSize();
+    }
+    return total;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    long total = 0;
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      total += memColumn.getTotalBufferedSize();
+    }
+    return total;
+  }
+
+  @Override
+  public void flush() {
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      long rows = rowCount - memColumn.getRowsWrittenSoFar();
+      if (rows > 0) {
+        memColumn.writePage(rowCount);
+      }
+      memColumn.finalizeColumnChunk();
+    }
+  }
+
+  public String memUsageString() {
+    StringBuilder b = new StringBuilder("Store {\n");
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      b.append(memColumn.memUsageString(" "));
+    }
+    b.append("}\n");
+    return b.toString();
+  }
+
+  @Override
+  public void endRecord() {
+    ++ rowCount;
+    if (rowCount >= rowCountForNextSizeCheck) {
+      sizeCheck();
+    }
+  }
+
+  private void sizeCheck() {
+    long minRecordToWait = Long.MAX_VALUE;
+    for (ColumnWriterV2 writer : writers) {
+      long usedMem = writer.getCurrentPageBufferedSize();
+      long rows = rowCount - writer.getRowsWrittenSoFar();
+      long remainingMem = pageSizeThreshold - usedMem;
+      if (remainingMem <= thresholdTolerance) {
+        writer.writePage(rowCount);
+        remainingMem = pageSizeThreshold;
+      }
+      long rowsToFillPage =
+          usedMem == 0 ?
+              MAXIMUM_RECORD_COUNT_FOR_CHECK
+              : (long)((float)rows) / usedMem * remainingMem;
+      if (rowsToFillPage < minRecordToWait) {
+        minRecordToWait = rowsToFillPage;
+      }
+    }
+    if (minRecordToWait == Long.MAX_VALUE) {
+      minRecordToWait = MINIMUM_RECORD_COUNT_FOR_CHECK;
+    }
+    // will check again halfway
+    rowCountForNextSizeCheck = rowCount +
+        min(
+            max(minRecordToWait / 2, MINIMUM_RECORD_COUNT_FOR_CHECK), // no less than MINIMUM_RECORD_COUNT_FOR_CHECK
+            MAXIMUM_RECORD_COUNT_FOR_CHECK); // no more than MAXIMUM_RECORD_COUNT_FOR_CHECK
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriterImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterImpl.java
deleted file mode 100644
index 628b848..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterImpl.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.column.impl;
-
-import static parquet.bytes.BytesInput.concat;
-
-import java.io.IOException;
-
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.column.values.ValuesWriter;
-import parquet.io.ParquetEncodingException;
-import parquet.io.api.Binary;
-
-/**
- * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
- *
- * @author Julien Le Dem
- *
- */
-final class ColumnWriterImpl implements ColumnWriter {
-  private static final Log LOG = Log.getLog(ColumnWriterImpl.class);
-  private static final boolean DEBUG = Log.DEBUG;
-  private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
-
-  private final ColumnDescriptor path;
-  private final PageWriter pageWriter;
-  private final long pageSizeThreshold;
-  private ValuesWriter repetitionLevelColumn;
-  private ValuesWriter definitionLevelColumn;
-  private ValuesWriter dataColumn;
-  private int valueCount;
-  private int valueCountForNextSizeCheck;
-
-  private Statistics statistics;
-
-  public ColumnWriterImpl(
-      ColumnDescriptor path,
-      PageWriter pageWriter,
-      int pageSizeThreshold,
-      int initialSizePerCol,
-      int dictionaryPageSizeThreshold,
-      boolean enableDictionary,
-      WriterVersion writerVersion) {
-    this.path = path;
-    this.pageWriter = pageWriter;
-    this.pageSizeThreshold = pageSizeThreshold;
-    // initial check of memory usage. So that we have enough data to make an initial prediction
-    this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
-    resetStatistics();
-
-    ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
-    this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
-    this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
-    this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
-  }
-
-  private void initStatistics() {
-    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
-  }
-
-  private void log(Object value, int r, int d) {
-    LOG.debug(path + " " + value + " r:" + r + " d:" + d);
-  }
-
-  private void resetStatistics() {
-    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
-  }
-
-  /**
-   * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
-   *
-   * We measure the memory used when we reach the mid point toward our estimated count.
-   * We then update the estimate and flush the page if we reached the threshold.
-   *
-   * That way we check the memory size log2(n) times.
-   *
-   */
-  private void accountForValueWritten() {
-    ++ valueCount;
-    if (valueCount > valueCountForNextSizeCheck) {
-      // not checking the memory used for every value
-      long memSize = repetitionLevelColumn.getBufferedSize()
-          + definitionLevelColumn.getBufferedSize()
-          + dataColumn.getBufferedSize();
-      if (memSize > pageSizeThreshold) {
-        // we will write the current page and check again the size at the predicted middle of next page
-        valueCountForNextSizeCheck = valueCount / 2;
-        writePage();
-      } else {
-        // not reached the threshold, will check again midway
-        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
-      }
-    }
-  }
-
-  private void updateStatisticsNumNulls() {
-    statistics.incrementNumNulls();
-  }
-
-  private void updateStatistics(int value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(long value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(float value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(double value) {
-   statistics.updateStats(value);
-  }
-
-  private void updateStatistics(Binary value) {
-   statistics.updateStats(value);
-  }
-
-  private void updateStatistics(boolean value) {
-   statistics.updateStats(value);
-  }
-
-  private void writePage() {
-    if (DEBUG) LOG.debug("write page");
-    try {
-      pageWriter.writePage(
-          concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
-          valueCount,
-          statistics,
-          repetitionLevelColumn.getEncoding(),
-          definitionLevelColumn.getEncoding(),
-          dataColumn.getEncoding());
-    } catch (IOException e) {
-      throw new ParquetEncodingException("could not write page for " + path, e);
-    }
-    repetitionLevelColumn.reset();
-    definitionLevelColumn.reset();
-    dataColumn.reset();
-    valueCount = 0;
-    resetStatistics();
-  }
-
-  @Override
-  public void writeNull(int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(null, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    updateStatisticsNumNulls();
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(double value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeDouble(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(float value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeFloat(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(Binary value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeBytes(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(boolean value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeBoolean(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(int value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeInteger(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(long value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeLong(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void flush() {
-    if (valueCount > 0) {
-      writePage();
-    }
-    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
-    if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
-      try {
-        pageWriter.writeDictionaryPage(dictionaryPage);
-      } catch (IOException e) {
-        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
-      }
-      dataColumn.resetDictionary();
-    }
-  }
-
-  @Override
-  public long getBufferedSizeInMemory() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
-  }
-
-  public long allocatedSize() {
-    return repetitionLevelColumn.getAllocatedSize()
-    + definitionLevelColumn.getAllocatedSize()
-    + dataColumn.getAllocatedSize()
-    + pageWriter.allocatedSize();
-  }
-
-  public String memUsageString(String indent) {
-    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
-    b.append(repetitionLevelColumn.memUsageString(indent + "  r:")).append("\n");
-    b.append(definitionLevelColumn.memUsageString(indent + "  d:")).append("\n");
-    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
-    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
-    b.append(indent).append(String.format("  total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
-    b.append(indent).append("}\n");
-    return b.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
new file mode 100644
index 0000000..8b72207
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.impl;
+
+import static parquet.bytes.BytesInput.concat;
+
+import java.io.IOException;
+
+import parquet.Log;
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnWriter;
+import parquet.column.ParquetProperties;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.Statistics;
+import parquet.column.values.ValuesWriter;
+import parquet.io.ParquetEncodingException;
+import parquet.io.api.Binary;
+
+/**
+ * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ *
+ * @author Julien Le Dem
+ *
+ */
+final class ColumnWriterV1 implements ColumnWriter {
+  private static final Log LOG = Log.getLog(ColumnWriterV1.class);
+  private static final boolean DEBUG = Log.DEBUG;
+  private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
+
+  private final ColumnDescriptor path;
+  private final PageWriter pageWriter;
+  private final long pageSizeThreshold;
+  private ValuesWriter repetitionLevelColumn;
+  private ValuesWriter definitionLevelColumn;
+  private ValuesWriter dataColumn;
+  private int valueCount;
+  private int valueCountForNextSizeCheck;
+
+  private Statistics statistics;
+
+  public ColumnWriterV1(
+      ColumnDescriptor path,
+      PageWriter pageWriter,
+      int pageSizeThreshold,
+      int initialSizePerCol,
+      int dictionaryPageSizeThreshold,
+      boolean enableDictionary,
+      WriterVersion writerVersion) {
+    this.path = path;
+    this.pageWriter = pageWriter;
+    this.pageSizeThreshold = pageSizeThreshold;
+    // initial check of memory usage. So that we have enough data to make an initial prediction
+    this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
+    resetStatistics();
+
+    ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
+    this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
+    this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
+    this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
+  }
+
+  private void log(Object value, int r, int d) {
+    LOG.debug(path + " " + value + " r:" + r + " d:" + d);
+  }
+
+  private void resetStatistics() {
+    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+  }
+
+  /**
+   * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
+   *
+   * We measure the memory used when we reach the mid point toward our estimated count.
+   * We then update the estimate and flush the page if we reached the threshold.
+   *
+   * That way we check the memory size log2(n) times.
+   *
+   */
+  private void accountForValueWritten() {
+    ++ valueCount;
+    if (valueCount > valueCountForNextSizeCheck) {
+      // not checking the memory used for every value
+      long memSize = repetitionLevelColumn.getBufferedSize()
+          + definitionLevelColumn.getBufferedSize()
+          + dataColumn.getBufferedSize();
+      if (memSize > pageSizeThreshold) {
+        // we will write the current page and check again the size at the predicted middle of next page
+        valueCountForNextSizeCheck = valueCount / 2;
+        writePage();
+      } else {
+        // not reached the threshold, will check again midway
+        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
+      }
+    }
+  }
+
+  private void updateStatisticsNumNulls() {
+    statistics.incrementNumNulls();
+  }
+
+  private void updateStatistics(int value) {
+    statistics.updateStats(value);
+  }
+
+  private void updateStatistics(long value) {
+    statistics.updateStats(value);
+  }
+
+  private void updateStatistics(float value) {
+    statistics.updateStats(value);
+  }
+
+  private void updateStatistics(double value) {
+   statistics.updateStats(value);
+  }
+
+  private void updateStatistics(Binary value) {
+   statistics.updateStats(value);
+  }
+
+  private void updateStatistics(boolean value) {
+   statistics.updateStats(value);
+  }
+
+  private void writePage() {
+    if (DEBUG) LOG.debug("write page");
+    try {
+      pageWriter.writePage(
+          concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
+          valueCount,
+          statistics,
+          repetitionLevelColumn.getEncoding(),
+          definitionLevelColumn.getEncoding(),
+          dataColumn.getEncoding());
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write page for " + path, e);
+    }
+    repetitionLevelColumn.reset();
+    definitionLevelColumn.reset();
+    dataColumn.reset();
+    valueCount = 0;
+    resetStatistics();
+  }
+
+  @Override
+  public void writeNull(int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(null, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    updateStatisticsNumNulls();
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(double value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeDouble(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(float value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeFloat(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(Binary value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeBytes(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(boolean value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeBoolean(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(int value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeInteger(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(long value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeLong(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  public void flush() {
+    if (valueCount > 0) {
+      writePage();
+    }
+    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+    if (dictionaryPage != null) {
+      if (DEBUG) LOG.debug("write dictionary");
+      try {
+        pageWriter.writeDictionaryPage(dictionaryPage);
+      } catch (IOException e) {
+        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+      }
+      dataColumn.resetDictionary();
+    }
+  }
+
+  public long getBufferedSizeInMemory() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize()
+        + pageWriter.getMemSize();
+  }
+
+  public long allocatedSize() {
+    return repetitionLevelColumn.getAllocatedSize()
+    + definitionLevelColumn.getAllocatedSize()
+    + dataColumn.getAllocatedSize()
+    + pageWriter.allocatedSize();
+  }
+
+  public String memUsageString(String indent) {
+    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+    b.append(repetitionLevelColumn.memUsageString(indent + "  r:")).append("\n");
+    b.append(definitionLevelColumn.memUsageString(indent + "  d:")).append("\n");
+    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
+    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
+    b.append(indent).append(String.format("  total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
+    b.append(indent).append("}\n");
+    return b.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
new file mode 100644
index 0000000..65f0366
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
@@ -0,0 +1,295 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.impl;
+
+import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
+
+import java.io.IOException;
+
+import parquet.Ints;
+import parquet.Log;
+import parquet.bytes.BytesInput;
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnWriter;
+import parquet.column.Encoding;
+import parquet.column.ParquetProperties;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.Statistics;
+import parquet.column.values.ValuesWriter;
+import parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import parquet.io.ParquetEncodingException;
+import parquet.io.api.Binary;
+
+/**
+ * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ *
+ * @author Julien Le Dem
+ *
+ */
+final class ColumnWriterV2 implements ColumnWriter {
+  private static final Log LOG = Log.getLog(ColumnWriterV2.class);
+  private static final boolean DEBUG = Log.DEBUG;
+
+  private final ColumnDescriptor path;
+  private final PageWriter pageWriter;
+  private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
+  private RunLengthBitPackingHybridEncoder definitionLevelColumn;
+  private ValuesWriter dataColumn;
+  private int valueCount;
+
+  private Statistics<?> statistics;
+  private long rowsWrittenSoFar = 0;
+
+  public ColumnWriterV2(
+      ColumnDescriptor path,
+      PageWriter pageWriter,
+      int initialSizePerCol,
+      ParquetProperties parquetProps) {
+    this.path = path;
+    this.pageWriter = pageWriter;
+    resetStatistics();
+    this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol);
+    this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol);
+    this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
+  }
+
+  private void log(Object value, int r, int d) {
+    LOG.debug(path + " " + value + " r:" + r + " d:" + d);
+  }
+
+  private void resetStatistics() {
+    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+  }
+
+  private void definitionLevel(int definitionLevel) {
+    try {
+      definitionLevelColumn.writeInt(definitionLevel);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
+    }
+  }
+
+  private void repetitionLevel(int repetitionLevel) {
+    try {
+      repetitionLevelColumn.writeInt(repetitionLevel);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
+    }
+  }
+
+  /**
+   * writes the current null value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void writeNull(int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(null, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    statistics.incrementNumNulls();
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(double value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeDouble(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(float value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeFloat(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(Binary value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeBytes(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(boolean value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeBoolean(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(int value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeInteger(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(long value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeLong(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
+   * Is called right after writePage
+   */
+  public void finalizeColumnChunk() {
+    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+    if (dictionaryPage != null) {
+      if (DEBUG) LOG.debug("write dictionary");
+      try {
+        pageWriter.writeDictionaryPage(dictionaryPage);
+      } catch (IOException e) {
+        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+      }
+      dataColumn.resetDictionary();
+    }
+  }
+
+  /**
+   * used to decide when to write a page
+   * @return the number of bytes of memory used to buffer the current data
+   */
+  public long getCurrentPageBufferedSize() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize();
+  }
+
+  /**
+   * used to decide when to write a page or row group
+   * @return the number of bytes of memory used to buffer the current data and the previously written pages
+   */
+  public long getTotalBufferedSize() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize()
+        + pageWriter.getMemSize();
+  }
+
+  /**
+   * @return actual memory used
+   */
+  public long allocatedSize() {
+    return repetitionLevelColumn.getAllocatedSize()
+    + definitionLevelColumn.getAllocatedSize()
+    + dataColumn.getAllocatedSize()
+    + pageWriter.allocatedSize();
+  }
+
+  /**
+   * @param prefix a prefix to format lines
+   * @return a formatted string showing how memory is used
+   */
+  public String memUsageString(String indent) {
+    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+    b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+    b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
+    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
+    b.append(indent).append(String.format("  total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
+    b.append(indent).append("}\n");
+    return b.toString();
+  }
+
+  public long getRowsWrittenSoFar() {
+    return this.rowsWrittenSoFar;
+  }
+
+  /**
+   * writes the current data to a new page in the page store
+   * @param rowCount how many rows have been written so far
+   */
+  public void writePage(long rowCount) {
+    int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
+    this.rowsWrittenSoFar = rowCount;
+    if (DEBUG) LOG.debug("write page");
+    try {
+      // TODO: rework this API. Those must be called *in that order*
+      BytesInput bytes = dataColumn.getBytes();
+      Encoding encoding = dataColumn.getEncoding();
+      pageWriter.writePageV2(
+          pageRowCount,
+          Ints.checkedCast(statistics.getNumNulls()),
+          valueCount,
+          path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
+          path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
+          encoding,
+          bytes,
+          statistics
+          );
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write page for " + path, e);
+    }
+    repetitionLevelColumn.reset();
+    definitionLevelColumn.reset();
+    dataColumn.reset();
+    valueCount = 0;
+    resetStatistics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/DataPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPage.java b/parquet-column/src/main/java/parquet/column/page/DataPage.java
new file mode 100644
index 0000000..3a1afa0
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/page/DataPage.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.page;
+
+/**
+ * one data page in a chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class DataPage extends Page {
+
+  private final int valueCount;
+
+  DataPage(int compressedSize, int uncompressedSize, int valueCount) {
+    super(compressedSize, uncompressedSize);
+    this.valueCount = valueCount;
+  }
+
+  /**
+   * @return the number of values in that page
+   */
+  public int getValueCount() {
+    return valueCount;
+  }
+
+  public abstract <T> T accept(Visitor<T> visitor);
+
+  public static interface Visitor<T> {
+
+    T visit(DataPageV1 dataPageV1);
+
+    T visit(DataPageV2 dataPageV2);
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
new file mode 100644
index 0000000..a53eed8
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
@@ -0,0 +1,80 @@
+package parquet.column.page;
+
+import parquet.Ints;
+import parquet.bytes.BytesInput;
+import parquet.column.Encoding;
+import parquet.column.statistics.Statistics;
+
+public class DataPageV1 extends DataPage {
+
+  private final BytesInput bytes;
+  private final Statistics<?> statistics;
+  private final Encoding rlEncoding;
+  private final Encoding dlEncoding;
+  private final Encoding valuesEncoding;
+
+  /**
+   * @param bytes the bytes for this page
+   * @param valueCount count of values in this page
+   * @param uncompressedSize the uncompressed size of the page
+   * @param statistics of the page's values (max, min, num_null)
+   * @param rlEncoding the repetition level encoding for this page
+   * @param dlEncoding the definition level encoding for this page
+   * @param valuesEncoding the values encoding for this page
+   * @param dlEncoding
+   */
+  public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statistics<?> stats, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
+    super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount);
+    this.bytes = bytes;
+    this.statistics = stats;
+    this.rlEncoding = rlEncoding;
+    this.dlEncoding = dlEncoding;
+    this.valuesEncoding = valuesEncoding;
+  }
+
+  /**
+   * @return the bytes for the page
+   */
+  public BytesInput getBytes() {
+    return bytes;
+  }
+
+  /**
+   *
+   * @return the statistics for this page (max, min, num_nulls)
+   */
+  public Statistics<?> getStatistics() {
+    return statistics;
+  }
+
+  /**
+   * @return the definition level encoding for this page
+   */
+  public Encoding getDlEncoding() {
+    return dlEncoding;
+  }
+
+  /**
+   * @return the repetition level encoding for this page
+   */
+  public Encoding getRlEncoding() {
+    return rlEncoding;
+  }
+
+  /**
+   * @return the values encoding for this page
+   */
+  public Encoding getValueEncoding() {
+    return valuesEncoding;
+  }
+
+  @Override
+  public String toString() {
+    return "Page [bytes.size=" + bytes.size() + ", valueCount=" + getValueCount() + ", uncompressedSize=" + getUncompressedSize() + "]";
+  }
+
+  @Override
+  public <T> T accept(Visitor<T> visitor) {
+    return visitor.visit(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/DataPageV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPageV2.java b/parquet-column/src/main/java/parquet/column/page/DataPageV2.java
new file mode 100644
index 0000000..bc9a873
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/page/DataPageV2.java
@@ -0,0 +1,138 @@
+package parquet.column.page;
+
+import parquet.Ints;
+import parquet.bytes.BytesInput;
+import parquet.column.Encoding;
+import parquet.column.statistics.Statistics;
+
+public class DataPageV2 extends DataPage {
+
+  /**
+   * @param rowCount
+   * @param nullCount
+   * @param valueCount
+   * @param repetitionLevels RLE encoded repetition levels
+   * @param definitionLevels RLE encoded definition levels
+   * @param dataEncoding encoding for the data
+   * @param data data encoded with dataEncoding
+   * @param statistics optional statistics for this page
+   * @return an uncompressed page
+   */
+  public static DataPageV2 uncompressed(
+      int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data,
+      Statistics<?> statistics) {
+    return new DataPageV2(
+        rowCount, nullCount, valueCount,
+        repetitionLevels, definitionLevels,
+        dataEncoding, data,
+        Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()),
+        statistics,
+        false);
+  }
+
+  /**
+   * @param rowCount
+   * @param nullCount
+   * @param valueCount
+   * @param repetitionLevels RLE encoded repetition levels
+   * @param definitionLevels RLE encoded definition levels
+   * @param dataEncoding encoding for the data
+   * @param data data encoded with dataEncoding and compressed
+   * @param uncompressedSize total size uncompressed (rl + dl + data)
+   * @param statistics optional statistics for this page
+   * @return a compressed page
+   */
+  public static DataPageV2 compressed(
+      int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data,
+      int uncompressedSize,
+      Statistics<?> statistics) {
+    return new DataPageV2(
+        rowCount, nullCount, valueCount,
+        repetitionLevels, definitionLevels,
+        dataEncoding, data,
+        uncompressedSize,
+        statistics,
+        true);
+  }
+
+  private final int rowCount;
+  private final int nullCount;
+  private final BytesInput repetitionLevels;
+  private final BytesInput definitionLevels;
+  private final Encoding dataEncoding;
+  private final BytesInput data;
+  private final Statistics<?> statistics;
+  private final boolean isCompressed;
+
+  public DataPageV2(
+      int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data,
+      int uncompressedSize,
+      Statistics<?> statistics,
+      boolean isCompressed) {
+    super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount);
+    this.rowCount = rowCount;
+    this.nullCount = nullCount;
+    this.repetitionLevels = repetitionLevels;
+    this.definitionLevels = definitionLevels;
+    this.dataEncoding = dataEncoding;
+    this.data = data;
+    this.statistics = statistics;
+    this.isCompressed = isCompressed;
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public int getNullCount() {
+    return nullCount;
+  }
+
+  public BytesInput getRepetitionLevels() {
+    return repetitionLevels;
+  }
+
+  public BytesInput getDefinitionLevels() {
+    return definitionLevels;
+  }
+
+  public Encoding getDataEncoding() {
+    return dataEncoding;
+  }
+
+  public BytesInput getData() {
+    return data;
+  }
+
+  public Statistics<?> getStatistics() {
+    return statistics;
+  }
+
+  public boolean isCompressed() {
+    return isCompressed;
+  }
+
+  @Override
+  public <T> T accept(Visitor<T> visitor) {
+    return visitor.visit(this);
+  }
+
+  @Override
+  public String toString() {
+    return "Page V2 ["
+        + "dl size=" + definitionLevels.size() + ", "
+        + "rl size=" + repetitionLevels.size() + ", "
+        + "data size=" + data.size() + ", "
+        + "data enc=" + dataEncoding + ", "
+        + "valueCount=" + getValueCount() + ", "
+        + "rowCount=" + getRowCount() + ", "
+        + "is compressed=" + isCompressed + ", "
+        + "uncompressedSize=" + getUncompressedSize() + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java b/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java
index 78e88a2..9ae1037 100644
--- a/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java
+++ b/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java
@@ -19,6 +19,7 @@ import static parquet.Preconditions.checkNotNull;
 
 import java.io.IOException;
 
+import parquet.Ints;
 import parquet.bytes.BytesInput;
 import parquet.column.Encoding;
 
@@ -28,10 +29,9 @@ import parquet.column.Encoding;
  * @author Julien Le Dem
  *
  */
-public class DictionaryPage {
+public class DictionaryPage extends Page {
 
   private final BytesInput bytes;
-  private final int uncompressedSize;
   private final int dictionarySize;
   private final Encoding encoding;
 
@@ -53,8 +53,8 @@ public class DictionaryPage {
    * @param encoding the encoding used
    */
   public DictionaryPage(BytesInput bytes, int uncompressedSize, int dictionarySize, Encoding encoding) {
+    super(Ints.checkedCast(bytes.size()), uncompressedSize);
     this.bytes = checkNotNull(bytes, "bytes");
-    this.uncompressedSize = uncompressedSize;
     this.dictionarySize = dictionarySize;
     this.encoding = checkNotNull(encoding, "encoding");
   }
@@ -63,10 +63,6 @@ public class DictionaryPage {
     return bytes;
   }
 
-  public int getUncompressedSize() {
-    return uncompressedSize;
-  }
-
   public int getDictionarySize() {
     return dictionarySize;
   }
@@ -76,13 +72,13 @@ public class DictionaryPage {
   }
 
   public DictionaryPage copy() throws IOException {
-    return new DictionaryPage(BytesInput.copy(bytes), uncompressedSize, dictionarySize, encoding);
+    return new DictionaryPage(BytesInput.copy(bytes), getUncompressedSize(), dictionarySize, encoding);
   }
 
 
   @Override
   public String toString() {
-    return "Page [bytes.size=" + bytes.size() + ", entryCount=" + dictionarySize + ", uncompressedSize=" + uncompressedSize + ", encoding=" + encoding + "]";
+    return "Page [bytes.size=" + bytes.size() + ", entryCount=" + dictionarySize + ", uncompressedSize=" + getUncompressedSize() + ", encoding=" + encoding + "]";
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/Page.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/Page.java b/parquet-column/src/main/java/parquet/column/page/Page.java
index 192b2a9..e5ab636 100644
--- a/parquet-column/src/main/java/parquet/column/page/Page.java
+++ b/parquet-column/src/main/java/parquet/column/page/Page.java
@@ -1,145 +1,31 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package parquet.column.page;
 
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.column.Encoding;
-import parquet.column.statistics.Statistics;
-import parquet.column.statistics.BooleanStatistics;
-
 /**
  * one page in a chunk
  *
  * @author Julien Le Dem
  *
  */
-public class Page {
-  private static final boolean DEBUG = Log.DEBUG;
-  private static final Log LOG = Log.getLog(Page.class);
-
-  private static int nextId = 0;
+abstract public class Page {
 
-  private final BytesInput bytes;
-  private final int valueCount;
+  private final int compressedSize;
   private final int uncompressedSize;
-  private final Statistics statistics;
-  private final Encoding rlEncoding;
-  private final Encoding dlEncoding;
-  private final Encoding valuesEncoding;
-  private final int id;
 
-  @Deprecated
-  /**
-   * @param bytes the bytes for this page
-   * @param valueCount count of values in this page
-   * @param uncompressedSize the uncompressed size of the page
-   * @param rlEncoding the repetition level encoding for this page
-   * @param dlEncoding the definition level encoding for this page
-   * @param valuesEncoding the values encoding for this page
-   * @param dlEncoding
-   */
-  public Page(BytesInput bytes, int valueCount, int uncompressedSize, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
-    this.bytes = bytes;
-    this.valueCount = valueCount;
+  Page(int compressedSize, int uncompressedSize) {
+    super();
+    this.compressedSize = compressedSize;
     this.uncompressedSize = uncompressedSize;
-    this.statistics = new BooleanStatistics();
-    this.rlEncoding = rlEncoding;
-    this.dlEncoding = dlEncoding;
-    this.valuesEncoding = valuesEncoding;
-    this.id = nextId ++;
-    if (DEBUG) LOG.debug("new Page #"+id+" : " + bytes.size() + " bytes and " + valueCount + " records");
-  }
-  /**
-   * @param bytes the bytes for this page
-   * @param valueCount count of values in this page
-   * @param uncompressedSize the uncompressed size of the page
-   * @param statistics of the page's values (max, min, num_null)
-   * @param rlEncoding the repetition level encoding for this page
-   * @param dlEncoding the definition level encoding for this page
-   * @param valuesEncoding the values encoding for this page
-   * @param dlEncoding
-   */
-  public Page(BytesInput bytes, int valueCount, int uncompressedSize, Statistics stats, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
-    this.bytes = bytes;
-    this.valueCount = valueCount;
-    this.uncompressedSize = uncompressedSize;
-    this.statistics = stats;
-    this.rlEncoding = rlEncoding;
-    this.dlEncoding = dlEncoding;
-    this.valuesEncoding = valuesEncoding;
-    this.id = nextId ++;
-    if (DEBUG) LOG.debug("new Page #"+id+" : " + bytes.size() + " bytes and " + valueCount + " records");
-  }
-  /**
-   *
-   * @return the bytes for the page
-   */
-  public BytesInput getBytes() {
-    return bytes;
   }
 
-  /**
-   *
-   * @return the number of values in that page
-   */
-  public int getValueCount() {
-    return valueCount;
+  public int getCompressedSize() {
+    return compressedSize;
   }
 
-  /**
-   *
-   * @return the uncompressed size of the page when the bytes are compressed
-   */
+ /**
+  * @return the uncompressed size of the page when the bytes are compressed
+  */
   public int getUncompressedSize() {
     return uncompressedSize;
   }
 
-  /**
-   *
-   * @return the statistics for this page (max, min, num_nulls)
-   */
-  public Statistics getStatistics() {
-    return statistics;
-  }
-
-  /**
-   * @return the definition level encoding for this page
-   */
-  public Encoding getDlEncoding() {
-    return dlEncoding;
-  }
-
-  /**
-   * @return the repetition level encoding for this page
-   */
-  public Encoding getRlEncoding() {
-    return rlEncoding;
-  }
-
-  /**
-   * @return the values encoding for this page
-   */
-  public Encoding getValueEncoding() {
-    return valuesEncoding;
-  }
-
-  @Override
-  public String toString() {
-    return "Page [id: " + id + ", bytes.size=" + bytes.size() + ", valueCount=" + valueCount + ", uncompressedSize=" + uncompressedSize + "]";
-  }
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/PageReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/PageReader.java b/parquet-column/src/main/java/parquet/column/page/PageReader.java
index f2ef171..d115037 100644
--- a/parquet-column/src/main/java/parquet/column/page/PageReader.java
+++ b/parquet-column/src/main/java/parquet/column/page/PageReader.java
@@ -36,5 +36,5 @@ public interface PageReader {
   /**
    * @return the next page in that chunk or null if after the last page
    */
-  Page readPage();
+  DataPage readPage();
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/PageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/parquet/column/page/PageWriter.java
index 5197e52..1d6aa52 100644
--- a/parquet-column/src/main/java/parquet/column/page/PageWriter.java
+++ b/parquet-column/src/main/java/parquet/column/page/PageWriter.java
@@ -29,29 +29,37 @@ import parquet.column.statistics.Statistics;
  */
 public interface PageWriter {
 
-  @Deprecated
   /**
    * writes a single page
    * @param bytesInput the bytes for the page
    * @param valueCount the number of values in that page
+   * @param statistics the statistics for that page
    * @param rlEncoding repetition level encoding
    * @param dlEncoding definition level encoding
    * @param valuesEncoding values encoding
    * @throws IOException
    */
-  void writePage(BytesInput bytesInput, int valueCount, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
+  void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
 
   /**
-   * writes a single page
-   * @param bytesInput the bytes for the page
-   * @param valueCount the number of values in that page
-   * @param statistics the statistics for that page
-   * @param rlEncoding repetition level encoding
-   * @param dlEncoding definition level encoding
-   * @param valuesEncoding values encoding
+   * writes a single page in the new format
+   * @param rowCount the number of rows in this page
+   * @param nullCount the number of null values (out of valueCount)
+   * @param valueCount the number of values in that page (there could be multiple values per row for repeated fields)
+   * @param repetitionLevels the repetition levels encoded in RLE without any size header
+   * @param definitionLevels the definition levels encoded in RLE without any size header
+   * @param dataEncoding the encoding for the data
+   * @param data the data encoded with dataEncoding
+   * @param statistics optional stats for this page
+   * @param metadata optional free form key values
    * @throws IOException
    */
-  void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
+  void writePageV2(
+      int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding,
+      BytesInput data,
+      Statistics<?> statistics) throws IOException;
 
   /**
    * @return the current size used in the memory buffer for that column chunk
@@ -69,6 +77,10 @@ public interface PageWriter {
    */
   void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException;
 
-  public abstract String memUsageString(String prefix);
+  /**
+   * @param prefix a prefix header to add at every line
+   * @return a string presenting a summary of how memory is used
+   */
+  String memUsageString(String prefix);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java
index a0c949c..1b674f5 100644
--- a/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java
+++ b/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java
@@ -66,9 +66,8 @@ public abstract class ValuesWriter {
   }
 
   /**
-   *
+   * ( > {@link #getBufferedMemorySize} )
    * @return the allocated size of the buffer
-   * ( > {@link #getBufferedMemorySize()() )
    */
   abstract public long getAllocatedSize();