You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:40 UTC

[43/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
new file mode 100644
index 0000000..2fa63a8
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
@@ -0,0 +1,661 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static java.lang.String.format;
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
+
+/**
+ * ColumnReader implementation
+ *
+ * @author Julien Le Dem
+ *
+ */
+class ColumnReaderImpl implements ColumnReader {
+  private static final Log LOG = Log.getLog(ColumnReaderImpl.class);
+
+  /**
+   * binds the lower level page decoder to the record converter materializing the records
+   *
+   * @author Julien Le Dem
+   *
+   */
+  private static abstract class Binding {
+
+    /**
+     * read one value from the underlying page
+     */
+    abstract void read();
+
+    /**
+     * skip one value from the underlying page
+     */
+    abstract void skip();
+
+    /**
+     * write current value to converter
+     */
+    abstract void writeValue();
+
+    /**
+     * @return current value
+     */
+    public int getDictionaryId() {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @return current value
+     */
+    public int getInteger() {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @return current value
+     */
+    public boolean getBoolean() {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @return current value
+     */
+    public long getLong() {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @return current value
+     */
+    public Binary getBinary() {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @return current value
+     */
+    public float getFloat() {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @return current value
+     */
+    public double getDouble() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private final ColumnDescriptor path;
+  private final long totalValueCount;
+  private final PageReader pageReader;
+  private final Dictionary dictionary;
+
+  private IntIterator repetitionLevelColumn;
+  private IntIterator definitionLevelColumn;
+  protected ValuesReader dataColumn;
+
+  private int repetitionLevel;
+  private int definitionLevel;
+  private int dictionaryId;
+
+  private long endOfPageValueCount;
+  private int readValues;
+  private int pageValueCount;
+
+  private final PrimitiveConverter converter;
+  private Binding binding;
+
+  // this is needed because we will attempt to read the value twice when filtering
+  // TODO: rework that
+  private boolean valueRead;
+
+  private void bindToDictionary(final Dictionary dictionary) {
+    binding =
+        new Binding() {
+          void read() {
+            dictionaryId = dataColumn.readValueDictionaryId();
+          }
+          public void skip() {
+            dataColumn.skip();
+          }
+          public int getDictionaryId() {
+            return dictionaryId;
+          }
+          void writeValue() {
+            converter.addValueFromDictionary(dictionaryId);
+          }
+          public int getInteger() {
+            return dictionary.decodeToInt(dictionaryId);
+          }
+          public boolean getBoolean() {
+            return dictionary.decodeToBoolean(dictionaryId);
+          }
+          public long getLong() {
+            return dictionary.decodeToLong(dictionaryId);
+          }
+          public Binary getBinary() {
+            return dictionary.decodeToBinary(dictionaryId);
+          }
+          public float getFloat() {
+            return dictionary.decodeToFloat(dictionaryId);
+          }
+          public double getDouble() {
+            return dictionary.decodeToDouble(dictionaryId);
+          }
+        };
+  }
+
+  private void bind(PrimitiveTypeName type) {
+    binding = type.convert(new PrimitiveTypeNameConverter<Binding, RuntimeException>() {
+      @Override
+      public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+        return new Binding() {
+          float current;
+          void read() {
+            current = dataColumn.readFloat();
+          }
+          public void skip() {
+            current = 0;
+            dataColumn.skip();
+          }
+          public float getFloat() {
+            return current;
+          }
+          void writeValue() {
+            converter.addFloat(current);
+          }
+        };
+      }
+      @Override
+      public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+        return new Binding() {
+          double current;
+          void read() {
+            current = dataColumn.readDouble();
+          }
+          public void skip() {
+            current = 0;
+            dataColumn.skip();
+          }
+          public double getDouble() {
+            return current;
+          }
+          void writeValue() {
+            converter.addDouble(current);
+          }
+        };
+      }
+      @Override
+      public Binding convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+        return new Binding() {
+          int current;
+          void read() {
+            current = dataColumn.readInteger();
+          }
+          public void skip() {
+            current = 0;
+            dataColumn.skip();
+          }
+          @Override
+          public int getInteger() {
+            return current;
+          }
+          void writeValue() {
+            converter.addInt(current);
+          }
+        };
+      }
+      @Override
+      public Binding convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+        return new Binding() {
+          long current;
+          void read() {
+            current = dataColumn.readLong();
+          }
+          public void skip() {
+            current = 0;
+            dataColumn.skip();
+          }
+          @Override
+          public long getLong() {
+            return current;
+          }
+          void writeValue() {
+            converter.addLong(current);
+          }
+        };
+      }
+      @Override
+      public Binding convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+        return this.convertBINARY(primitiveTypeName);
+      }
+      @Override
+      public Binding convertFIXED_LEN_BYTE_ARRAY(
+          PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+        return this.convertBINARY(primitiveTypeName);
+      }
+      @Override
+      public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+        return new Binding() {
+          boolean current;
+          void read() {
+            current = dataColumn.readBoolean();
+          }
+          public void skip() {
+            current = false;
+            dataColumn.skip();
+          }
+          @Override
+          public boolean getBoolean() {
+            return current;
+          }
+          void writeValue() {
+            converter.addBoolean(current);
+          }
+        };
+      }
+      @Override
+      public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+        return new Binding() {
+          Binary current;
+          void read() {
+            current = dataColumn.readBytes();
+          }
+          public void skip() {
+            current = null;
+            dataColumn.skip();
+          }
+          @Override
+          public Binary getBinary() {
+            return current;
+          }
+          void writeValue() {
+            converter.addBinary(current);
+          }
+        };
+      }
+    });
+  }
+
+  /**
+   * creates a reader for triplets
+   * @param path the descriptor for the corresponding column
+   * @param pageReader the underlying store to read from
+   */
+  public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter) {
+    this.path = checkNotNull(path, "path");
+    this.pageReader = checkNotNull(pageReader, "pageReader");
+    this.converter = checkNotNull(converter, "converter");
+    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+    if (dictionaryPage != null) {
+      try {
+        this.dictionary = dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
+        if (converter.hasDictionarySupport()) {
+          converter.setDictionary(dictionary);
+        }
+      } catch (IOException e) {
+        throw new ParquetDecodingException("could not decode the dictionary for " + path, e);
+      }
+    } else {
+      this.dictionary = null;
+    }
+    this.totalValueCount = pageReader.getTotalValueCount();
+    if (totalValueCount == 0) {
+      throw new ParquetDecodingException("totalValueCount == 0");
+    }
+    consume();
+  }
+
+  private boolean isFullyConsumed() {
+    return readValues >= totalValueCount;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#writeCurrentValueToConverter()
+   */
+  @Override
+  public void writeCurrentValueToConverter() {
+    readValue();
+    this.binding.writeValue();
+  }
+
+  @Override
+  public int getCurrentValueDictionaryID() {
+    readValue();
+    return binding.getDictionaryId();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getInteger()
+   */
+  @Override
+  public int getInteger() {
+    readValue();
+    return this.binding.getInteger();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getBoolean()
+   */
+  @Override
+  public boolean getBoolean() {
+    readValue();
+    return this.binding.getBoolean();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getLong()
+   */
+  @Override
+  public long getLong() {
+    readValue();
+    return this.binding.getLong();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getBinary()
+   */
+  @Override
+  public Binary getBinary() {
+    readValue();
+    return this.binding.getBinary();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getFloat()
+   */
+  @Override
+  public float getFloat() {
+    readValue();
+    return this.binding.getFloat();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getDouble()
+   */
+  @Override
+  public double getDouble() {
+    readValue();
+    return this.binding.getDouble();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getCurrentRepetitionLevel()
+   */
+  @Override
+  public int getCurrentRepetitionLevel() {
+    return repetitionLevel;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getDescriptor()
+   */
+  @Override
+  public ColumnDescriptor getDescriptor() {
+    return path;
+  }
+
+  /**
+   * Reads the value into the binding.
+   */
+  public void readValue() {
+    try {
+      if (!valueRead) {
+        binding.read();
+        valueRead = true;
+      }
+    } catch (RuntimeException e) {
+      throw new ParquetDecodingException(
+          format(
+              "Can't read value in column %s at value %d out of %d, %d out of %d in currentPage. repetition level: %d, definition level: %d",
+              path, readValues, totalValueCount, readValues - (endOfPageValueCount - pageValueCount), pageValueCount, repetitionLevel, definitionLevel),
+          e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#skip()
+   */
+  @Override
+  public void skip() {
+    if (!valueRead) {
+      binding.skip();
+      valueRead = true;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getCurrentDefinitionLevel()
+   */
+  @Override
+  public int getCurrentDefinitionLevel() {
+    return definitionLevel;
+  }
+
+  // TODO: change the logic around read() to not tie together reading from the 3 columns
+  private void readRepetitionAndDefinitionLevels() {
+    repetitionLevel = repetitionLevelColumn.nextInt();
+    definitionLevel = definitionLevelColumn.nextInt();
+    ++readValues;
+  }
+
+  private void checkRead() {
+    if (isPageFullyConsumed()) {
+      if (isFullyConsumed()) {
+        if (DEBUG) LOG.debug("end reached");
+        repetitionLevel = 0; // the next repetition level
+        return;
+      }
+      readPage();
+    }
+    readRepetitionAndDefinitionLevels();
+  }
+
+  private void readPage() {
+    if (DEBUG) LOG.debug("loading page");
+    DataPage page = pageReader.readPage();
+    page.accept(new DataPage.Visitor<Void>() {
+      @Override
+      public Void visit(DataPageV1 dataPageV1) {
+        readPageV1(dataPageV1);
+        return null;
+      }
+      @Override
+      public Void visit(DataPageV2 dataPageV2) {
+        readPageV2(dataPageV2);
+        return null;
+      }
+    });
+  }
+
+  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
+    this.pageValueCount = valueCount;
+    this.endOfPageValueCount = readValues + pageValueCount;
+    if (dataEncoding.usesDictionary()) {
+      if (dictionary == null) {
+        throw new ParquetDecodingException(
+            "could not read page in col " + path + " as the dictionary was missing for encoding " + dataEncoding);
+      }
+      this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(path, VALUES, dictionary);
+    } else {
+      this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
+    }
+    if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
+      bindToDictionary(dictionary);
+    } else {
+      bind(path.getType());
+    }
+    try {
+      dataColumn.initFromPage(pageValueCount, bytes, offset);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page in col " + path, e);
+    }
+  }
+
+  private void readPageV1(DataPageV1 page) {
+    ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
+    ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
+    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+    try {
+      byte[] bytes = page.getBytes().toByteArray();
+      if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
+      if (DEBUG) LOG.debug("reading repetition levels at 0");
+      rlReader.initFromPage(pageValueCount, bytes, 0);
+      int next = rlReader.getNextOffset();
+      if (DEBUG) LOG.debug("reading definition levels at " + next);
+      dlReader.initFromPage(pageValueCount, bytes, next);
+      next = dlReader.getNextOffset();
+      if (DEBUG) LOG.debug("reading data at " + next);
+      initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
+    }
+  }
+
+  private void readPageV2(DataPageV2 page) {
+    this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
+    this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
+    try {
+      if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
+      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
+    }
+  }
+
+  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+    try {
+      if (maxLevel == 0) {
+        return new NullIntIterator();
+      }
+      return new RLEIntIterator(
+          new RunLengthBitPackingHybridDecoder(
+              BytesUtils.getWidthFromMaxInt(maxLevel),
+              new ByteArrayInputStream(bytes.toByteArray())));
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read levels in page for col " + path, e);
+    }
+  }
+
+  private boolean isPageFullyConsumed() {
+    return readValues >= endOfPageValueCount;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#consume()
+   */
+  @Override
+  public void consume() {
+    checkRead();
+    valueRead = false;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.ColumnReader#getTotalValueCount()
+   */
+  @Override
+  public long getTotalValueCount() {
+    return totalValueCount;
+  }
+
+  static abstract class IntIterator {
+    abstract int nextInt();
+  }
+
+  static class ValuesReaderIntIterator extends IntIterator {
+    ValuesReader delegate;
+
+    public ValuesReaderIntIterator(ValuesReader delegate) {
+      super();
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      return delegate.readInteger();
+    }
+  }
+
+  static class RLEIntIterator extends IntIterator {
+    RunLengthBitPackingHybridDecoder delegate;
+
+    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      try {
+        return delegate.readInt();
+      } catch (IOException e) {
+        throw new ParquetDecodingException(e);
+      }
+    }
+  }
+
+  private static final class NullIntIterator extends IntIterator {
+    @Override
+    int nextInt() {
+      return 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
new file mode 100644
index 0000000..a72b6f7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -0,0 +1,135 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+
+public class ColumnWriteStoreV1 implements ColumnWriteStore {
+
+  private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
+  private final PageWriteStore pageWriteStore;
+  private final int pageSizeThreshold;
+  private final int dictionaryPageSizeThreshold;
+  private final boolean enableDictionary;
+  private final WriterVersion writerVersion;
+
+  public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
+    super();
+    this.pageWriteStore = pageWriteStore;
+    this.pageSizeThreshold = pageSizeThreshold;
+    this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
+    this.enableDictionary = enableDictionary;
+    this.writerVersion = writerVersion;
+  }
+
+  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+    ColumnWriterV1 column = columns.get(path);
+    if (column == null) {
+      column = newMemColumn(path);
+      columns.put(path, column);
+    }
+    return column;
+  }
+
+  public Set<ColumnDescriptor> getColumnDescriptors() {
+    return columns.keySet();
+  }
+
+  private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
+    PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+    return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
+  }
+
+  @Override
+  public String toString() {
+      StringBuilder sb = new StringBuilder();
+      for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
+        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+        sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
+        sb.append("\n");
+      }
+      return sb.toString();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    Collection<ColumnWriterV1> values = columns.values();
+    long total = 0;
+    for (ColumnWriterV1 memColumn : values) {
+      total += memColumn.allocatedSize();
+    }
+    return total;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    Collection<ColumnWriterV1> values = columns.values();
+    long total = 0;
+    for (ColumnWriterV1 memColumn : values) {
+      total += memColumn.getBufferedSizeInMemory();
+    }
+    return total;
+  }
+
+  @Override
+  public String memUsageString() {
+    StringBuilder b = new StringBuilder("Store {\n");
+    Collection<ColumnWriterV1> values = columns.values();
+    for (ColumnWriterV1 memColumn : values) {
+      b.append(memColumn.memUsageString(" "));
+    }
+    b.append("}\n");
+    return b.toString();
+  }
+
+  public long maxColMemSize() {
+    Collection<ColumnWriterV1> values = columns.values();
+    long max = 0;
+    for (ColumnWriterV1 memColumn : values) {
+      max = Math.max(max, memColumn.getBufferedSizeInMemory());
+    }
+    return max;
+  }
+
+  @Override
+  public void flush() {
+    Collection<ColumnWriterV1> values = columns.values();
+    for (ColumnWriterV1 memColumn : values) {
+      memColumn.flush();
+    }
+  }
+
+  @Override
+  public void endRecord() {
+    // V1 does not take record boundaries into account
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
new file mode 100644
index 0000000..fc17a22
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -0,0 +1,166 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.schema.MessageType;
+
+public class ColumnWriteStoreV2 implements ColumnWriteStore {
+
+  // will wait for at least that many records before checking again
+  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+  // will flush even if size bellow the threshold by this much to facilitate page alignment
+  private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
+
+  private final Map<ColumnDescriptor, ColumnWriterV2> columns;
+  private final Collection<ColumnWriterV2> writers;
+  private long rowCount;
+  private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+  private final long thresholdTolerance;
+
+  private int pageSizeThreshold;
+
+  public ColumnWriteStoreV2(
+      MessageType schema,
+      PageWriteStore pageWriteStore,
+      int pageSizeThreshold,
+      ParquetProperties parquetProps) {
+    super();
+    this.pageSizeThreshold = pageSizeThreshold;
+    this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
+    Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
+    for (ColumnDescriptor path : schema.getColumns()) {
+      PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+      mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
+    }
+    this.columns = unmodifiableMap(mcolumns);
+    this.writers = this.columns.values();
+  }
+
+  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+    return columns.get(path);
+  }
+
+  public Set<ColumnDescriptor> getColumnDescriptors() {
+    return columns.keySet();
+  }
+
+  @Override
+  public String toString() {
+      StringBuilder sb = new StringBuilder();
+      for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
+        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+        sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
+        sb.append("\n");
+      }
+      return sb.toString();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    long total = 0;
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      total += memColumn.allocatedSize();
+    }
+    return total;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    long total = 0;
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      total += memColumn.getTotalBufferedSize();
+    }
+    return total;
+  }
+
+  @Override
+  public void flush() {
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      long rows = rowCount - memColumn.getRowsWrittenSoFar();
+      if (rows > 0) {
+        memColumn.writePage(rowCount);
+      }
+      memColumn.finalizeColumnChunk();
+    }
+  }
+
+  public String memUsageString() {
+    StringBuilder b = new StringBuilder("Store {\n");
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      b.append(memColumn.memUsageString(" "));
+    }
+    b.append("}\n");
+    return b.toString();
+  }
+
+  @Override
+  public void endRecord() {
+    ++ rowCount;
+    if (rowCount >= rowCountForNextSizeCheck) {
+      sizeCheck();
+    }
+  }
+
+  private void sizeCheck() {
+    long minRecordToWait = Long.MAX_VALUE;
+    for (ColumnWriterV2 writer : writers) {
+      long usedMem = writer.getCurrentPageBufferedSize();
+      long rows = rowCount - writer.getRowsWrittenSoFar();
+      long remainingMem = pageSizeThreshold - usedMem;
+      if (remainingMem <= thresholdTolerance) {
+        writer.writePage(rowCount);
+        remainingMem = pageSizeThreshold;
+      }
+      long rowsToFillPage =
+          usedMem == 0 ?
+              MAXIMUM_RECORD_COUNT_FOR_CHECK
+              : (long)((float)rows) / usedMem * remainingMem;
+      if (rowsToFillPage < minRecordToWait) {
+        minRecordToWait = rowsToFillPage;
+      }
+    }
+    if (minRecordToWait == Long.MAX_VALUE) {
+      minRecordToWait = MINIMUM_RECORD_COUNT_FOR_CHECK;
+    }
+    // will check again halfway
+    rowCountForNextSizeCheck = rowCount +
+        min(
+            max(minRecordToWait / 2, MINIMUM_RECORD_COUNT_FOR_CHECK), // no less than MINIMUM_RECORD_COUNT_FOR_CHECK
+            MAXIMUM_RECORD_COUNT_FOR_CHECK); // no more than MAXIMUM_RECORD_COUNT_FOR_CHECK
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
new file mode 100644
index 0000000..f4079c7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -0,0 +1,278 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static org.apache.parquet.bytes.BytesInput.concat;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+import static java.lang.Math.max;
+
+/**
+ * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ *
+ * @author Julien Le Dem
+ *
+ */
+final class ColumnWriterV1 implements ColumnWriter {
+  private static final Log LOG = Log.getLog(ColumnWriterV1.class);
+  private static final boolean DEBUG = Log.DEBUG;
+  private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
+  private static final int MIN_SLAB_SIZE = 64;
+
+  private final ColumnDescriptor path;
+  private final PageWriter pageWriter;
+  private final long pageSizeThreshold;
+  private ValuesWriter repetitionLevelColumn;
+  private ValuesWriter definitionLevelColumn;
+  private ValuesWriter dataColumn;
+  private int valueCount;
+  private int valueCountForNextSizeCheck;
+
+  private Statistics statistics;
+
+  public ColumnWriterV1(
+      ColumnDescriptor path,
+      PageWriter pageWriter,
+      int pageSizeThreshold,
+      int dictionaryPageSizeThreshold,
+      boolean enableDictionary,
+      WriterVersion writerVersion) {
+    this.path = path;
+    this.pageWriter = pageWriter;
+    this.pageSizeThreshold = pageSizeThreshold;
+    // initial check of memory usage. So that we have enough data to make an initial prediction
+    this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
+    resetStatistics();
+
+    ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
+
+    this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
+    this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
+
+    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
+    this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
+  }
+
+  private void log(Object value, int r, int d) {
+    LOG.debug(path + " " + value + " r:" + r + " d:" + d);
+  }
+
+  private void resetStatistics() {
+    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+  }
+
+  /**
+   * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
+   *
+   * We measure the memory used when we reach the mid point toward our estimated count.
+   * We then update the estimate and flush the page if we reached the threshold.
+   *
+   * That way we check the memory size log2(n) times.
+   *
+   */
+  private void accountForValueWritten() {
+    ++ valueCount;
+    if (valueCount > valueCountForNextSizeCheck) {
+      // not checking the memory used for every value
+      long memSize = repetitionLevelColumn.getBufferedSize()
+          + definitionLevelColumn.getBufferedSize()
+          + dataColumn.getBufferedSize();
+      if (memSize > pageSizeThreshold) {
+        // we will write the current page and check again the size at the predicted middle of next page
+        valueCountForNextSizeCheck = valueCount / 2;
+        writePage();
+      } else {
+        // not reached the threshold, will check again midway
+        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
+      }
+    }
+  }
+
+  private void updateStatisticsNumNulls() {
+    statistics.incrementNumNulls();
+  }
+
+  private void updateStatistics(int value) {
+    statistics.updateStats(value);
+  }
+
+  private void updateStatistics(long value) {
+    statistics.updateStats(value);
+  }
+
+  private void updateStatistics(float value) {
+    statistics.updateStats(value);
+  }
+
+  private void updateStatistics(double value) {
+   statistics.updateStats(value);
+  }
+
+  private void updateStatistics(Binary value) {
+   statistics.updateStats(value);
+  }
+
+  private void updateStatistics(boolean value) {
+   statistics.updateStats(value);
+  }
+
+  private void writePage() {
+    if (DEBUG) LOG.debug("write page");
+    try {
+      pageWriter.writePage(
+          concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
+          valueCount,
+          statistics,
+          repetitionLevelColumn.getEncoding(),
+          definitionLevelColumn.getEncoding(),
+          dataColumn.getEncoding());
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write page for " + path, e);
+    }
+    repetitionLevelColumn.reset();
+    definitionLevelColumn.reset();
+    dataColumn.reset();
+    valueCount = 0;
+    resetStatistics();
+  }
+
+  @Override
+  public void writeNull(int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(null, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    updateStatisticsNumNulls();
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(double value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeDouble(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(float value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeFloat(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(Binary value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeBytes(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(boolean value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeBoolean(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(int value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeInteger(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  @Override
+  public void write(long value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+    definitionLevelColumn.writeInteger(definitionLevel);
+    dataColumn.writeLong(value);
+    updateStatistics(value);
+    accountForValueWritten();
+  }
+
+  public void flush() {
+    if (valueCount > 0) {
+      writePage();
+    }
+    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+    if (dictionaryPage != null) {
+      if (DEBUG) LOG.debug("write dictionary");
+      try {
+        pageWriter.writeDictionaryPage(dictionaryPage);
+      } catch (IOException e) {
+        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+      }
+      dataColumn.resetDictionary();
+    }
+  }
+
+  public long getBufferedSizeInMemory() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize()
+        + pageWriter.getMemSize();
+  }
+
+  public long allocatedSize() {
+    return repetitionLevelColumn.getAllocatedSize()
+    + definitionLevelColumn.getAllocatedSize()
+    + dataColumn.getAllocatedSize()
+    + pageWriter.allocatedSize();
+  }
+
+  public String memUsageString(String indent) {
+    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+    b.append(repetitionLevelColumn.memUsageString(indent + "  r:")).append("\n");
+    b.append(definitionLevelColumn.memUsageString(indent + "  d:")).append("\n");
+    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
+    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
+    b.append(indent).append(String.format("  total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
+    b.append(indent).append("}\n");
+    return b.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
new file mode 100644
index 0000000..5e936a2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -0,0 +1,304 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static java.lang.Math.max;
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+
+import java.io.IOException;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ *
+ * @author Julien Le Dem
+ *
+ */
+final class ColumnWriterV2 implements ColumnWriter {
+  private static final Log LOG = Log.getLog(ColumnWriterV2.class);
+  private static final boolean DEBUG = Log.DEBUG;
+  private static final int MIN_SLAB_SIZE = 64;
+
+  private final ColumnDescriptor path;
+  private final PageWriter pageWriter;
+  private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
+  private RunLengthBitPackingHybridEncoder definitionLevelColumn;
+  private ValuesWriter dataColumn;
+  private int valueCount;
+
+  private Statistics<?> statistics;
+  private long rowsWrittenSoFar = 0;
+
+  public ColumnWriterV2(
+      ColumnDescriptor path,
+      PageWriter pageWriter,
+      ParquetProperties parquetProps,
+      int pageSize) {
+    this.path = path;
+    this.pageWriter = pageWriter;
+    resetStatistics();
+
+    this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
+    this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);
+
+    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
+    this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
+  }
+
+  private void log(Object value, int r, int d) {
+    LOG.debug(path + " " + value + " r:" + r + " d:" + d);
+  }
+
+  private void resetStatistics() {
+    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+  }
+
+  private void definitionLevel(int definitionLevel) {
+    try {
+      definitionLevelColumn.writeInt(definitionLevel);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
+    }
+  }
+
+  private void repetitionLevel(int repetitionLevel) {
+    try {
+      repetitionLevelColumn.writeInt(repetitionLevel);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
+    }
+  }
+
+  /**
+   * writes the current null value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void writeNull(int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(null, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    statistics.incrementNumNulls();
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(double value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeDouble(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(float value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeFloat(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(Binary value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeBytes(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(boolean value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeBoolean(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(int value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeInteger(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  public void write(long value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeLong(value);
+    statistics.updateStats(value);
+    ++ valueCount;
+  }
+
+  /**
+   * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
+   * Is called right after writePage
+   */
+  public void finalizeColumnChunk() {
+    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+    if (dictionaryPage != null) {
+      if (DEBUG) LOG.debug("write dictionary");
+      try {
+        pageWriter.writeDictionaryPage(dictionaryPage);
+      } catch (IOException e) {
+        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+      }
+      dataColumn.resetDictionary();
+    }
+  }
+
+  /**
+   * used to decide when to write a page
+   * @return the number of bytes of memory used to buffer the current data
+   */
+  public long getCurrentPageBufferedSize() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize();
+  }
+
+  /**
+   * used to decide when to write a page or row group
+   * @return the number of bytes of memory used to buffer the current data and the previously written pages
+   */
+  public long getTotalBufferedSize() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize()
+        + pageWriter.getMemSize();
+  }
+
+  /**
+   * @return actual memory used
+   */
+  public long allocatedSize() {
+    return repetitionLevelColumn.getAllocatedSize()
+    + definitionLevelColumn.getAllocatedSize()
+    + dataColumn.getAllocatedSize()
+    + pageWriter.allocatedSize();
+  }
+
+  /**
+   * @param prefix a prefix to format lines
+   * @return a formatted string showing how memory is used
+   */
+  public String memUsageString(String indent) {
+    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+    b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+    b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
+    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
+    b.append(indent).append(String.format("  total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
+    b.append(indent).append("}\n");
+    return b.toString();
+  }
+
+  public long getRowsWrittenSoFar() {
+    return this.rowsWrittenSoFar;
+  }
+
+  /**
+   * writes the current data to a new page in the page store
+   * @param rowCount how many rows have been written so far
+   */
+  public void writePage(long rowCount) {
+    int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
+    this.rowsWrittenSoFar = rowCount;
+    if (DEBUG) LOG.debug("write page");
+    try {
+      // TODO: rework this API. Those must be called *in that order*
+      BytesInput bytes = dataColumn.getBytes();
+      Encoding encoding = dataColumn.getEncoding();
+      pageWriter.writePageV2(
+          pageRowCount,
+          Ints.checkedCast(statistics.getNumNulls()),
+          valueCount,
+          path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
+          path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
+          encoding,
+          bytes,
+          statistics
+          );
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write page for " + path, e);
+    }
+    repetitionLevelColumn.reset();
+    definitionLevelColumn.reset();
+    dataColumn.reset();
+    valueCount = 0;
+    resetStatistics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
new file mode 100644
index 0000000..9f11490
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
@@ -0,0 +1,53 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+/**
+ * one data page in a chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class DataPage extends Page {
+
+  private final int valueCount;
+
+  DataPage(int compressedSize, int uncompressedSize, int valueCount) {
+    super(compressedSize, uncompressedSize);
+    this.valueCount = valueCount;
+  }
+
+  /**
+   * @return the number of values in that page
+   */
+  public int getValueCount() {
+    return valueCount;
+  }
+
+  public abstract <T> T accept(Visitor<T> visitor);
+
+  public static interface Visitor<T> {
+
+    T visit(DataPageV1 dataPageV1);
+
+    T visit(DataPageV2 dataPageV2);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
new file mode 100644
index 0000000..2206517
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
@@ -0,0 +1,98 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.Statistics;
+
+public class DataPageV1 extends DataPage {
+
+  private final BytesInput bytes;
+  private final Statistics<?> statistics;
+  private final Encoding rlEncoding;
+  private final Encoding dlEncoding;
+  private final Encoding valuesEncoding;
+
+  /**
+   * @param bytes the bytes for this page
+   * @param valueCount count of values in this page
+   * @param uncompressedSize the uncompressed size of the page
+   * @param statistics of the page's values (max, min, num_null)
+   * @param rlEncoding the repetition level encoding for this page
+   * @param dlEncoding the definition level encoding for this page
+   * @param valuesEncoding the values encoding for this page
+   * @param dlEncoding
+   */
+  public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statistics<?> stats, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
+    super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount);
+    this.bytes = bytes;
+    this.statistics = stats;
+    this.rlEncoding = rlEncoding;
+    this.dlEncoding = dlEncoding;
+    this.valuesEncoding = valuesEncoding;
+  }
+
+  /**
+   * @return the bytes for the page
+   */
+  public BytesInput getBytes() {
+    return bytes;
+  }
+
+  /**
+   *
+   * @return the statistics for this page (max, min, num_nulls)
+   */
+  public Statistics<?> getStatistics() {
+    return statistics;
+  }
+
+  /**
+   * @return the definition level encoding for this page
+   */
+  public Encoding getDlEncoding() {
+    return dlEncoding;
+  }
+
+  /**
+   * @return the repetition level encoding for this page
+   */
+  public Encoding getRlEncoding() {
+    return rlEncoding;
+  }
+
+  /**
+   * @return the values encoding for this page
+   */
+  public Encoding getValueEncoding() {
+    return valuesEncoding;
+  }
+
+  @Override
+  public String toString() {
+    return "Page [bytes.size=" + bytes.size() + ", valueCount=" + getValueCount() + ", uncompressedSize=" + getUncompressedSize() + "]";
+  }
+
+  @Override
+  public <T> T accept(Visitor<T> visitor) {
+    return visitor.visit(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
new file mode 100644
index 0000000..13b64c3
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
@@ -0,0 +1,156 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.Statistics;
+
+public class DataPageV2 extends DataPage {
+
+  /**
+   * @param rowCount
+   * @param nullCount
+   * @param valueCount
+   * @param repetitionLevels RLE encoded repetition levels
+   * @param definitionLevels RLE encoded definition levels
+   * @param dataEncoding encoding for the data
+   * @param data data encoded with dataEncoding
+   * @param statistics optional statistics for this page
+   * @return an uncompressed page
+   */
+  public static DataPageV2 uncompressed(
+      int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data,
+      Statistics<?> statistics) {
+    return new DataPageV2(
+        rowCount, nullCount, valueCount,
+        repetitionLevels, definitionLevels,
+        dataEncoding, data,
+        Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()),
+        statistics,
+        false);
+  }
+
+  /**
+   * @param rowCount
+   * @param nullCount
+   * @param valueCount
+   * @param repetitionLevels RLE encoded repetition levels
+   * @param definitionLevels RLE encoded definition levels
+   * @param dataEncoding encoding for the data
+   * @param data data encoded with dataEncoding and compressed
+   * @param uncompressedSize total size uncompressed (rl + dl + data)
+   * @param statistics optional statistics for this page
+   * @return a compressed page
+   */
+  public static DataPageV2 compressed(
+      int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data,
+      int uncompressedSize,
+      Statistics<?> statistics) {
+    return new DataPageV2(
+        rowCount, nullCount, valueCount,
+        repetitionLevels, definitionLevels,
+        dataEncoding, data,
+        uncompressedSize,
+        statistics,
+        true);
+  }
+
+  private final int rowCount;
+  private final int nullCount;
+  private final BytesInput repetitionLevels;
+  private final BytesInput definitionLevels;
+  private final Encoding dataEncoding;
+  private final BytesInput data;
+  private final Statistics<?> statistics;
+  private final boolean isCompressed;
+
+  public DataPageV2(
+      int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data,
+      int uncompressedSize,
+      Statistics<?> statistics,
+      boolean isCompressed) {
+    super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount);
+    this.rowCount = rowCount;
+    this.nullCount = nullCount;
+    this.repetitionLevels = repetitionLevels;
+    this.definitionLevels = definitionLevels;
+    this.dataEncoding = dataEncoding;
+    this.data = data;
+    this.statistics = statistics;
+    this.isCompressed = isCompressed;
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public int getNullCount() {
+    return nullCount;
+  }
+
+  public BytesInput getRepetitionLevels() {
+    return repetitionLevels;
+  }
+
+  public BytesInput getDefinitionLevels() {
+    return definitionLevels;
+  }
+
+  public Encoding getDataEncoding() {
+    return dataEncoding;
+  }
+
+  public BytesInput getData() {
+    return data;
+  }
+
+  public Statistics<?> getStatistics() {
+    return statistics;
+  }
+
+  public boolean isCompressed() {
+    return isCompressed;
+  }
+
+  @Override
+  public <T> T accept(Visitor<T> visitor) {
+    return visitor.visit(this);
+  }
+
+  @Override
+  public String toString() {
+    return "Page V2 ["
+        + "dl size=" + definitionLevels.size() + ", "
+        + "rl size=" + repetitionLevels.size() + ", "
+        + "data size=" + data.size() + ", "
+        + "data enc=" + dataEncoding + ", "
+        + "valueCount=" + getValueCount() + ", "
+        + "rowCount=" + getRowCount() + ", "
+        + "is compressed=" + isCompressed + ", "
+        + "uncompressedSize=" + getUncompressedSize() + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java
new file mode 100644
index 0000000..306d81b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java
@@ -0,0 +1,88 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.io.IOException;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+
+/**
+ * Data for a dictionary page
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class DictionaryPage extends Page {
+
+  private final BytesInput bytes;
+  private final int dictionarySize;
+  private final Encoding encoding;
+
+  /**
+   * creates an uncompressed page
+   * @param bytes the content of the page
+   * @param dictionarySize the value count in the dictionary
+   * @param encoding the encoding used
+   */
+  public DictionaryPage(BytesInput bytes, int dictionarySize, Encoding encoding) {
+    this(bytes, (int)bytes.size(), dictionarySize, encoding); // TODO: fix sizes long or int
+  }
+
+  /**
+   * creates a dictionary page
+   * @param bytes the (possibly compressed) content of the page
+   * @param uncompressedSize the size uncompressed
+   * @param dictionarySize the value count in the dictionary
+   * @param encoding the encoding used
+   */
+  public DictionaryPage(BytesInput bytes, int uncompressedSize, int dictionarySize, Encoding encoding) {
+    super(Ints.checkedCast(bytes.size()), uncompressedSize);
+    this.bytes = checkNotNull(bytes, "bytes");
+    this.dictionarySize = dictionarySize;
+    this.encoding = checkNotNull(encoding, "encoding");
+  }
+
+  public BytesInput getBytes() {
+    return bytes;
+  }
+
+  public int getDictionarySize() {
+    return dictionarySize;
+  }
+
+  public Encoding getEncoding() {
+    return encoding;
+  }
+
+  public DictionaryPage copy() throws IOException {
+    return new DictionaryPage(BytesInput.copy(bytes), getUncompressedSize(), dictionarySize, encoding);
+  }
+
+
+  @Override
+  public String toString() {
+    return "Page [bytes.size=" + bytes.size() + ", entryCount=" + dictionarySize + ", uncompressedSize=" + getUncompressedSize() + ", encoding=" + encoding + "]";
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
new file mode 100644
index 0000000..3c6b012
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
@@ -0,0 +1,49 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+/**
+ * one page in a chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class Page {
+
+  private final int compressedSize;
+  private final int uncompressedSize;
+
+  Page(int compressedSize, int uncompressedSize) {
+    super();
+    this.compressedSize = compressedSize;
+    this.uncompressedSize = uncompressedSize;
+  }
+
+  public int getCompressedSize() {
+    return compressedSize;
+  }
+
+ /**
+  * @return the uncompressed size of the page when the bytes are compressed
+  */
+  public int getUncompressedSize() {
+    return uncompressedSize;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
new file mode 100644
index 0000000..3cfe624
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
@@ -0,0 +1,46 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * contains all the readers for all the columns of the corresponding row group
+ *
+ * TODO: rename to RowGroup?
+ * 
+ * @author Julien Le Dem
+ *
+ */
+public interface PageReadStore {
+
+  /**
+   *
+   * @param descriptor the descriptor of the column
+   * @return the page reader for that column
+   */
+  PageReader getPageReader(ColumnDescriptor descriptor);
+
+  /**
+   *
+   * @return the total number of rows in that row group
+   */
+  long getRowCount();
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java
new file mode 100644
index 0000000..94c9cb7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java
@@ -0,0 +1,43 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+/**
+ * Reader for a sequence a page from a given column chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface PageReader {
+
+ /**
+  * @return the dictionary page in that chunk or null if none
+  */
+  DictionaryPage readDictionaryPage();
+
+  /**
+   * @return the total number of values in the column chunk
+   */
+  long getTotalValueCount();
+
+  /**
+   * @return the next page in that chunk or null if after the last page
+   */
+  DataPage readPage();
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
new file mode 100644
index 0000000..2de9db9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
@@ -0,0 +1,38 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * contains all the writers for the columns in the corresponding row group
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface PageWriteStore {
+
+  /**
+   *
+   * @param path the descriptor for the column
+   * @return the corresponding page writer
+   */
+  PageWriter getPageWriter(ColumnDescriptor path);
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
new file mode 100644
index 0000000..4ad7d9f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
@@ -0,0 +1,89 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import java.io.IOException;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.Statistics;
+
+/**
+ * a writer for all the pages of a given column chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface PageWriter {
+
+  /**
+   * writes a single page
+   * @param bytesInput the bytes for the page
+   * @param valueCount the number of values in that page
+   * @param statistics the statistics for that page
+   * @param rlEncoding repetition level encoding
+   * @param dlEncoding definition level encoding
+   * @param valuesEncoding values encoding
+   * @throws IOException
+   */
+  void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
+
+  /**
+   * writes a single page in the new format
+   * @param rowCount the number of rows in this page
+   * @param nullCount the number of null values (out of valueCount)
+   * @param valueCount the number of values in that page (there could be multiple values per row for repeated fields)
+   * @param repetitionLevels the repetition levels encoded in RLE without any size header
+   * @param definitionLevels the definition levels encoded in RLE without any size header
+   * @param dataEncoding the encoding for the data
+   * @param data the data encoded with dataEncoding
+   * @param statistics optional stats for this page
+   * @param metadata optional free form key values
+   * @throws IOException
+   */
+  void writePageV2(
+      int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding,
+      BytesInput data,
+      Statistics<?> statistics) throws IOException;
+
+  /**
+   * @return the current size used in the memory buffer for that column chunk
+   */
+  long getMemSize();
+
+  /**
+   * @return the allocated size for the buffer ( > getMemSize() )
+   */
+  long allocatedSize();
+
+  /**
+   * writes a dictionary page
+   * @param dictionaryPage the dictionary page containing the dictionary data
+   */
+  void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException;
+
+  /**
+   * @param prefix a prefix header to add at every line
+   * @return a string presenting a summary of how memory is used
+   */
+  String memUsageString(String prefix);
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
new file mode 100644
index 0000000..b2d9b55
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
@@ -0,0 +1,108 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.io.api.Binary;
+
+public class BinaryStatistics extends Statistics<Binary> {
+
+  private Binary max;
+  private Binary min;
+
+  @Override
+  public void updateStats(Binary value) {
+    if (!this.hasNonNullValue()) {
+      initializeStats(value, value);
+    } else {
+      updateStats(value, value);
+    }
+  }
+
+  @Override
+  public void mergeStatisticsMinMax(Statistics stats) {
+    BinaryStatistics binaryStats = (BinaryStatistics)stats;
+    if (!this.hasNonNullValue()) {
+      initializeStats(binaryStats.getMin(), binaryStats.getMax());
+    } else {
+      updateStats(binaryStats.getMin(), binaryStats.getMax());
+    }
+  }
+
+  @Override
+  public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+    max = Binary.fromByteArray(maxBytes);
+    min = Binary.fromByteArray(minBytes);
+    this.markAsNotEmpty();
+  }
+
+  @Override
+  public byte[] getMaxBytes() {
+    return max.getBytes();
+  }
+
+  @Override
+  public byte[] getMinBytes() {
+    return min.getBytes();
+  }
+
+  @Override
+  public String toString() {
+    if (this.hasNonNullValue())
+      return String.format("min: %s, max: %s, num_nulls: %d", min.toStringUsingUTF8(), max.toStringUsingUTF8(), this.getNumNulls());
+   else if (!this.isEmpty())
+      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+   else
+      return "no stats for this column";
+  }
+
+  public void updateStats(Binary min_value, Binary max_value) {
+    if (min.compareTo(min_value) > 0) { min = min_value; }
+    if (max.compareTo(max_value) < 0) { max = max_value; }
+  }
+
+  public void initializeStats(Binary min_value, Binary max_value) {
+      min = min_value;
+      max = max_value;
+      this.markAsNotEmpty();
+  }
+
+  @Override
+  public Binary genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Binary genericGetMax() {
+    return max;
+  }
+
+  public Binary getMax() {
+    return max;
+  }
+
+  public Binary getMin() {
+    return min;
+  }
+
+  public void setMinMax(Binary min, Binary max) {
+    this.max = max;
+    this.min = min;
+    this.markAsNotEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
new file mode 100644
index 0000000..1d02c74
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
@@ -0,0 +1,108 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class BooleanStatistics extends Statistics<Boolean> {
+
+  private boolean max;
+  private boolean min;
+
+  @Override
+  public void updateStats(boolean value) {
+    if (!this.hasNonNullValue()) {
+      initializeStats(value, value);
+    } else {
+      updateStats(value, value);
+    }
+  }
+
+  @Override
+  public void mergeStatisticsMinMax(Statistics stats) {
+    BooleanStatistics boolStats = (BooleanStatistics)stats;
+    if (!this.hasNonNullValue()) {
+      initializeStats(boolStats.getMin(), boolStats.getMax());
+    } else {
+      updateStats(boolStats.getMin(), boolStats.getMax());
+    }
+  }
+
+  @Override
+  public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+    max = BytesUtils.bytesToBool(maxBytes);
+    min = BytesUtils.bytesToBool(minBytes);
+    this.markAsNotEmpty();
+  }
+
+  @Override
+  public byte[] getMaxBytes() {
+    return BytesUtils.booleanToBytes(max);
+  }
+
+  @Override
+  public byte[] getMinBytes() {
+    return BytesUtils.booleanToBytes(min);
+  }
+
+  @Override
+  public String toString() {
+    if (this.hasNonNullValue())
+      return String.format("min: %b, max: %b, num_nulls: %d", min, max, this.getNumNulls());
+    else if(!this.isEmpty())
+      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+    else  
+      return "no stats for this column";
+  }
+
+  public void updateStats(boolean min_value, boolean max_value) {
+    if (min && !min_value) { min = min_value; }
+    if (!max && max_value) { max = max_value; }
+  }
+
+  public void initializeStats(boolean min_value, boolean max_value) {
+      min = min_value;
+      max = max_value;
+      this.markAsNotEmpty();
+  }
+
+  @Override
+  public Boolean genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Boolean genericGetMax() {
+    return max;
+  }
+
+  public boolean getMax() {
+    return max;
+  }
+
+  public boolean getMin() {
+    return min;
+  }
+
+  public void setMinMax(boolean min, boolean max) {
+    this.max = max;
+    this.min = min;
+    this.markAsNotEmpty();
+  }
+}