You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:32 UTC

[35/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/Encoding.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/Encoding.java b/parquet-column/src/main/java/parquet/column/Encoding.java
deleted file mode 100644
index 7aa5c1d..0000000
--- a/parquet-column/src/main/java/parquet/column/Encoding.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-import static parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
-
-import java.io.IOException;
-
-import parquet.bytes.BytesUtils;
-import parquet.column.page.DictionaryPage;
-import parquet.column.values.ValuesReader;
-import parquet.column.values.bitpacking.ByteBitPackingValuesReader;
-import parquet.column.values.boundedint.ZeroIntegerValuesReader;
-import parquet.column.values.delta.DeltaBinaryPackingValuesReader;
-import parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
-import parquet.column.values.deltastrings.DeltaByteArrayReader;
-import parquet.column.values.dictionary.DictionaryValuesReader;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainBinaryDictionary;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainDoubleDictionary;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainFloatDictionary;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainIntegerDictionary;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainLongDictionary;
-import parquet.column.values.plain.BinaryPlainValuesReader;
-import parquet.column.values.plain.FixedLenByteArrayPlainValuesReader;
-import parquet.column.values.plain.BooleanPlainValuesReader;
-import parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;
-import parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader;
-import parquet.column.values.plain.PlainValuesReader.IntegerPlainValuesReader;
-import parquet.column.values.plain.PlainValuesReader.LongPlainValuesReader;
-import parquet.column.values.rle.RunLengthBitPackingHybridValuesReader;
-import parquet.io.ParquetDecodingException;
-
-/**
- * encoding of the data
- *
- * @author Julien Le Dem
- *
- */
-public enum Encoding {
-
-  PLAIN {
-    @Override
-    public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
-      switch (descriptor.getType()) {
-      case BOOLEAN:
-        return new BooleanPlainValuesReader();
-      case BINARY:
-        return new BinaryPlainValuesReader();
-      case FLOAT:
-        return new FloatPlainValuesReader();
-      case DOUBLE:
-        return new DoublePlainValuesReader();
-      case INT32:
-        return new IntegerPlainValuesReader();
-      case INT64:
-        return new LongPlainValuesReader();
-      case INT96:
-        return new FixedLenByteArrayPlainValuesReader(12);
-      case FIXED_LEN_BYTE_ARRAY:
-        return new FixedLenByteArrayPlainValuesReader(descriptor.getTypeLength());
-      default:
-        throw new ParquetDecodingException("no plain reader for type " + descriptor.getType());
-      }
-    }
-
-    @Override
-    public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
-      switch (descriptor.getType()) {
-      case BINARY:
-        return new PlainBinaryDictionary(dictionaryPage);
-      case FIXED_LEN_BYTE_ARRAY:
-        return new PlainBinaryDictionary(dictionaryPage, descriptor.getTypeLength());
-      case INT96:
-        return new PlainBinaryDictionary(dictionaryPage, 12);
-      case INT64:
-        return new PlainLongDictionary(dictionaryPage);
-      case DOUBLE:
-        return new PlainDoubleDictionary(dictionaryPage);
-      case INT32:
-        return new PlainIntegerDictionary(dictionaryPage);
-      case FLOAT:
-        return new PlainFloatDictionary(dictionaryPage);
-      default:
-        throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
-      }
-
-    }
-  },
-
-  /**
-   * Actually a combination of bit packing and run length encoding.
-   * TODO: Should we rename this to be more clear?
-   */
-  RLE {
-    @Override
-    public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
-      int bitWidth = BytesUtils.getWidthFromMaxInt(getMaxLevel(descriptor, valuesType));
-      if(bitWidth == 0) {
-        return new ZeroIntegerValuesReader();
-      }
-      return new RunLengthBitPackingHybridValuesReader(bitWidth);
-    }
-  },
-
-  /**
-   * @deprecated This is no longer used, and has been replaced by {@link #RLE}
-   * which is combination of bit packing and rle
-   */
-  @Deprecated
-  BIT_PACKED {
-    @Override
-    public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
-      return new ByteBitPackingValuesReader(getMaxLevel(descriptor, valuesType), BIG_ENDIAN);
-    }
-  },
-
-  /**
-   * @deprecated now replaced by RLE_DICTIONARY for the data page encoding and PLAIN for the dictionary page encoding
-   */
-  @Deprecated
-  PLAIN_DICTIONARY {
-    @Override
-    public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
-      return RLE_DICTIONARY.getDictionaryBasedValuesReader(descriptor, valuesType, dictionary);
-    }
-
-    @Override
-    public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
-      return PLAIN.initDictionary(descriptor, dictionaryPage);
-    }
-
-    @Override
-    public boolean usesDictionary() {
-      return true;
-    }
-
-  },
-
-  /**
-   * Delta encoding for integers. This can be used for int columns and works best
-   * on sorted data
-   */
-  DELTA_BINARY_PACKED {
-    @Override
-    public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
-      if(descriptor.getType() != INT32) {
-        throw new ParquetDecodingException("Encoding DELTA_BINARY_PACKED is only supported for type INT32");
-      }
-      return new DeltaBinaryPackingValuesReader();
-    }
-  },
-
-  /**
-   * Encoding for byte arrays to separate the length values and the data. The lengths
-   * are encoded using DELTA_BINARY_PACKED
-   */
-  DELTA_LENGTH_BYTE_ARRAY {
-    @Override
-    public ValuesReader getValuesReader(ColumnDescriptor descriptor,
-        ValuesType valuesType) {
-      if (descriptor.getType() != BINARY) {
-        throw new ParquetDecodingException("Encoding DELTA_LENGTH_BYTE_ARRAY is only supported for type BINARY");
-      }
-      return new DeltaLengthByteArrayValuesReader();
-    }
-  },
-
-  /**
-   * Incremental-encoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED.
-   * Suffixes are stored as delta length byte arrays.
-   */
-  DELTA_BYTE_ARRAY {
-    @Override
-    public ValuesReader getValuesReader(ColumnDescriptor descriptor,
-        ValuesType valuesType) {
-      if (descriptor.getType() != BINARY) {
-        throw new ParquetDecodingException("Encoding DELTA_BYTE_ARRAY is only supported for type BINARY");
-      }
-      return new DeltaByteArrayReader();
-    }
-  },
-
-  /**
-   * Dictionary encoding: the ids are encoded using the RLE encoding
-   */
-  RLE_DICTIONARY {
-
-    @Override
-    public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
-      switch (descriptor.getType()) {
-      case BINARY:
-      case FIXED_LEN_BYTE_ARRAY:
-      case INT96:
-      case INT64:
-      case DOUBLE:
-      case INT32:
-      case FLOAT:
-        return new DictionaryValuesReader(dictionary);
-      default:
-        throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
-      }
-    }
-
-    @Override
-    public boolean usesDictionary() {
-      return true;
-    }
-
-  };
-
-  int getMaxLevel(ColumnDescriptor descriptor, ValuesType valuesType) {
-    int maxLevel;
-    switch (valuesType) {
-    case REPETITION_LEVEL:
-      maxLevel = descriptor.getMaxRepetitionLevel();
-      break;
-    case DEFINITION_LEVEL:
-      maxLevel = descriptor.getMaxDefinitionLevel();
-      break;
-    case VALUES:
-      if(descriptor.getType() == BOOLEAN) {
-        maxLevel = 1;
-        break;
-      }
-    default:
-      throw new ParquetDecodingException("Unsupported encoding for values: " + this);
-    }
-    return maxLevel;
-  }
-
-  /**
-   * @return whether this encoding requires a dictionary
-   */
-  public boolean usesDictionary() {
-    return false;
-  }
-
-  /**
-   * initializes a dictionary from a page
-   * @param dictionaryPage
-   * @return the corresponding dictionary
-   */
-  public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
-    throw new UnsupportedOperationException(this.name() + " does not support dictionary");
-  }
-
-  /**
-   * To read decoded values that don't require a dictionary
-   *
-   * @param descriptor the column to read
-   * @param valuesType the type of values
-   * @return the proper values reader for the given column
-   * @throw {@link UnsupportedOperationException} if the encoding is dictionary based
-   */
-  public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
-    throw new UnsupportedOperationException("Error decoding " + descriptor + ". " + this.name() + " is dictionary based");
-  }
-
-  /**
-   * To read decoded values that require a dictionary
-   *
-   * @param descriptor the column to read
-   * @param valuesType the type of values
-   * @param dictionary the dictionary
-   * @return the proper values reader for the given column
-   * @throw {@link UnsupportedOperationException} if the encoding is not dictionary based
-   */
-  public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
-    throw new UnsupportedOperationException(this.name() + " is not dictionary based");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
deleted file mode 100644
index 31bec36..0000000
--- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
-import static parquet.column.Encoding.PLAIN;
-import static parquet.column.Encoding.PLAIN_DICTIONARY;
-import static parquet.column.Encoding.RLE_DICTIONARY;
-import parquet.column.impl.ColumnWriteStoreV1;
-import parquet.column.impl.ColumnWriteStoreV2;
-import parquet.column.page.PageWriteStore;
-import parquet.column.values.ValuesWriter;
-import parquet.column.values.boundedint.DevNullValuesWriter;
-import parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
-import parquet.column.values.deltastrings.DeltaByteArrayWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
-import parquet.column.values.fallback.FallbackValuesWriter;
-import parquet.column.values.plain.BooleanPlainValuesWriter;
-import parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
-import parquet.column.values.plain.PlainValuesWriter;
-import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
-import parquet.schema.MessageType;
-
-/**
- * This class represents all the configurable Parquet properties.
- *
- * @author amokashi
- *
- */
-public class ParquetProperties {
-
-  public enum WriterVersion {
-    PARQUET_1_0 ("v1"),
-    PARQUET_2_0 ("v2");
-
-    private final String shortName;
-
-    WriterVersion(String shortname) {
-      this.shortName = shortname;
-    }
-
-    public static WriterVersion fromString(String name) {
-      for (WriterVersion v : WriterVersion.values()) {
-        if (v.shortName.equals(name)) {
-          return v;
-        }
-      }
-      // Throws IllegalArgumentException if name does not exact match with enum name
-      return WriterVersion.valueOf(name);
-    }
-  }
-  private final int dictionaryPageSizeThreshold;
-  private final WriterVersion writerVersion;
-  private final boolean enableDictionary;
-
-  public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict) {
-    this.dictionaryPageSizeThreshold = dictPageSize;
-    this.writerVersion = writerVersion;
-    this.enableDictionary = enableDict;
-  }
-
-  public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
-    if (maxLevel == 0) {
-      return new DevNullValuesWriter();
-    } else {
-      return new RunLengthBitPackingHybridValuesWriter(
-          getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
-    }
-  }
-
-  private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
-    switch (path.getType()) {
-    case BOOLEAN:
-      return new BooleanPlainValuesWriter();
-    case INT96:
-      return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
-    case FIXED_LEN_BYTE_ARRAY:
-      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
-    case BINARY:
-    case INT32:
-    case INT64:
-    case DOUBLE:
-    case FLOAT:
-      return new PlainValuesWriter(initialSizePerCol, pageSize);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
-  }
-
-  private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initialSizePerCol) {
-    Encoding encodingForDataPage;
-    Encoding encodingForDictionaryPage;
-    switch(writerVersion) {
-    case PARQUET_1_0:
-      encodingForDataPage = PLAIN_DICTIONARY;
-      encodingForDictionaryPage = PLAIN_DICTIONARY;
-      break;
-    case PARQUET_2_0:
-      encodingForDataPage = RLE_DICTIONARY;
-      encodingForDictionaryPage = PLAIN;
-      break;
-    default:
-      throw new IllegalArgumentException("Unknown version: " + writerVersion);
-    }
-    switch (path.getType()) {
-    case BOOLEAN:
-      throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
-    case BINARY:
-      return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
-    case INT32:
-      return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
-    case INT64:
-      return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
-    case INT96:
-      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage);
-    case DOUBLE:
-      return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
-    case FLOAT:
-      return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
-    case FIXED_LEN_BYTE_ARRAY:
-      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
-  }
-
-  private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
-    switch(writerVersion) {
-    case PARQUET_1_0:
-      return plainWriter(path, initialSizePerCol, pageSize);
-    case PARQUET_2_0:
-      switch (path.getType()) {
-      case BOOLEAN:
-        return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
-      case BINARY:
-      case FIXED_LEN_BYTE_ARRAY:
-        return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
-      case INT32:
-        return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
-      case INT96:
-      case INT64:
-      case DOUBLE:
-      case FLOAT:
-        return plainWriter(path, initialSizePerCol, pageSize);
-      default:
-        throw new IllegalArgumentException("Unknown type " + path.getType());
-      }
-    default:
-      throw new IllegalArgumentException("Unknown version: " + writerVersion);
-    }
-  }
-
-  private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
-    ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
-    if (enableDictionary) {
-      return FallbackValuesWriter.of(
-          dictionaryWriter(path, initialSizePerCol),
-          writerToFallBackTo);
-    } else {
-     return writerToFallBackTo;
-    }
-  }
-
-  public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
-    switch (path.getType()) {
-    case BOOLEAN: // no dictionary encoding for boolean
-      return writerToFallbackTo(path, initialSizePerCol, pageSize);
-    case FIXED_LEN_BYTE_ARRAY:
-      // dictionary encoding for that type was not enabled in PARQUET 1.0
-      if (writerVersion == WriterVersion.PARQUET_2_0) {
-        return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
-      } else {
-       return writerToFallbackTo(path, initialSizePerCol, pageSize);
-      }
-    case BINARY:
-    case INT32:
-    case INT64:
-    case INT96:
-    case DOUBLE:
-    case FLOAT:
-      return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
-  }
-
-  public int getDictionaryPageSizeThreshold() {
-    return dictionaryPageSizeThreshold;
-  }
-
-  public WriterVersion getWriterVersion() {
-    return writerVersion;
-  }
-
-  public boolean isEnableDictionary() {
-    return enableDictionary;
-  }
-
-  public ColumnWriteStore newColumnWriteStore(
-      MessageType schema,
-      PageWriteStore pageStore,
-      int pageSize) {
-    switch (writerVersion) {
-    case PARQUET_1_0:
-      return new ColumnWriteStoreV1(
-          pageStore,
-          pageSize,
-          dictionaryPageSizeThreshold,
-          enableDictionary, writerVersion);
-    case PARQUET_2_0:
-      return new ColumnWriteStoreV2(
-          schema,
-          pageStore,
-          pageSize,
-          new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
-    default:
-      throw new IllegalArgumentException("unknown version " + writerVersion);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/UnknownColumnException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/UnknownColumnException.java b/parquet-column/src/main/java/parquet/column/UnknownColumnException.java
deleted file mode 100644
index c3b7dc8..0000000
--- a/parquet-column/src/main/java/parquet/column/UnknownColumnException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-import parquet.ParquetRuntimeException;
-
-/**
- * Thrown if the specified column is unknown in the underlying storage
- *
- * @author Julien Le Dem
- */
-public class UnknownColumnException extends ParquetRuntimeException {
-  private static final long serialVersionUID = 1L;
-
-  private final ColumnDescriptor descriptor;
-
-  public UnknownColumnException(ColumnDescriptor descriptor) {
-    super("Column not found: " + descriptor.toString());
-    this.descriptor = descriptor;
-  }
-
-  public ColumnDescriptor getDescriptor() {
-    return descriptor;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/UnknownColumnTypeException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/UnknownColumnTypeException.java b/parquet-column/src/main/java/parquet/column/UnknownColumnTypeException.java
deleted file mode 100644
index 32b3016..0000000
--- a/parquet-column/src/main/java/parquet/column/UnknownColumnTypeException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-import parquet.ParquetRuntimeException;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-/**
- * Thrown if the specified column type is unknown in the underlying storage
- *
- * @author  Katya Gonina
- */
-public class UnknownColumnTypeException extends ParquetRuntimeException {
-  private static final long serialVersionUID = 1L;
-
-  private final PrimitiveTypeName type;
-
-  public UnknownColumnTypeException(PrimitiveTypeName type) {
-    super("Column type not found: " + type.toString());
-    this.type= type;
-  }
-
-  public PrimitiveTypeName getType() {
-    return this.type;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/ValuesType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ValuesType.java b/parquet-column/src/main/java/parquet/column/ValuesType.java
deleted file mode 100644
index f6e9322..0000000
--- a/parquet-column/src/main/java/parquet/column/ValuesType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-/**
- * The different type of values we can store in columns
- *
- * @author Julien Le Dem
- *
- */
-public enum ValuesType {
-  REPETITION_LEVEL, DEFINITION_LEVEL, VALUES;
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnReadStoreImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReadStoreImpl.java
deleted file mode 100644
index a98919f..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnReadStoreImpl.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnReadStore;
-import parquet.column.ColumnReader;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.io.api.Converter;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-/**
- * Implementation of the ColumnReadStore
- *
- * Initializes individual columns based on schema and converter
- *
- * @author Julien Le Dem
- *
- */
-public class ColumnReadStoreImpl implements ColumnReadStore {
-
-  private final PageReadStore pageReadStore;
-  private final GroupConverter recordConverter;
-  private final MessageType schema;
-
-  /**
-   * @param pageReadStore uderlying page storage
-   * @param recordConverter the user provided converter to materialize records
-   * @param schema the schema we are reading
-   */
-  public ColumnReadStoreImpl(PageReadStore pageReadStore, GroupConverter recordConverter, MessageType schema) {
-    super();
-    this.pageReadStore = pageReadStore;
-    this.recordConverter = recordConverter;
-    this.schema = schema;
-  }
-
-  @Override
-  public ColumnReader getColumnReader(ColumnDescriptor path) {
-    return newMemColumnReader(path, pageReadStore.getPageReader(path));
-  }
-
-  private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
-    PrimitiveConverter converter = getPrimitiveConverter(path);
-    return new ColumnReaderImpl(path, pageReader, converter);
-  }
-
-  private PrimitiveConverter getPrimitiveConverter(ColumnDescriptor path) {
-    Type currentType = schema;
-    Converter currentConverter = recordConverter;
-    for (String fieldName : path.getPath()) {
-      final GroupType groupType = currentType.asGroupType();
-      int fieldIndex = groupType.getFieldIndex(fieldName);
-      currentType = groupType.getType(fieldName);
-      currentConverter = currentConverter.asGroupConverter().getConverter(fieldIndex);
-    }
-    PrimitiveConverter converter = currentConverter.asPrimitiveConverter();
-    return converter;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
deleted file mode 100644
index e2a6a3a..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
+++ /dev/null
@@ -1,661 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static java.lang.String.format;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-import static parquet.column.ValuesType.DEFINITION_LEVEL;
-import static parquet.column.ValuesType.REPETITION_LEVEL;
-import static parquet.column.ValuesType.VALUES;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnReader;
-import parquet.column.Dictionary;
-import parquet.column.Encoding;
-import parquet.column.page.DataPage;
-import parquet.column.page.DataPageV1;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageReader;
-import parquet.column.values.ValuesReader;
-import parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.Binary;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
-
-/**
- * ColumnReader implementation
- *
- * @author Julien Le Dem
- *
- */
-class ColumnReaderImpl implements ColumnReader {
-  private static final Log LOG = Log.getLog(ColumnReaderImpl.class);
-
-  /**
-   * binds the lower level page decoder to the record converter materializing the records
-   *
-   * @author Julien Le Dem
-   *
-   */
-  private static abstract class Binding {
-
-    /**
-     * read one value from the underlying page
-     */
-    abstract void read();
-
-    /**
-     * skip one value from the underlying page
-     */
-    abstract void skip();
-
-    /**
-     * write current value to converter
-     */
-    abstract void writeValue();
-
-    /**
-     * @return current value
-     */
-    public int getDictionaryId() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public int getInteger() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public boolean getBoolean() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public long getLong() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public Binary getBinary() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public float getFloat() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public double getDouble() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private final ColumnDescriptor path;
-  private final long totalValueCount;
-  private final PageReader pageReader;
-  private final Dictionary dictionary;
-
-  private IntIterator repetitionLevelColumn;
-  private IntIterator definitionLevelColumn;
-  protected ValuesReader dataColumn;
-
-  private int repetitionLevel;
-  private int definitionLevel;
-  private int dictionaryId;
-
-  private long endOfPageValueCount;
-  private int readValues;
-  private int pageValueCount;
-
-  private final PrimitiveConverter converter;
-  private Binding binding;
-
-  // this is needed because we will attempt to read the value twice when filtering
-  // TODO: rework that
-  private boolean valueRead;
-
-  private void bindToDictionary(final Dictionary dictionary) {
-    binding =
-        new Binding() {
-          void read() {
-            dictionaryId = dataColumn.readValueDictionaryId();
-          }
-          public void skip() {
-            dataColumn.skip();
-          }
-          public int getDictionaryId() {
-            return dictionaryId;
-          }
-          void writeValue() {
-            converter.addValueFromDictionary(dictionaryId);
-          }
-          public int getInteger() {
-            return dictionary.decodeToInt(dictionaryId);
-          }
-          public boolean getBoolean() {
-            return dictionary.decodeToBoolean(dictionaryId);
-          }
-          public long getLong() {
-            return dictionary.decodeToLong(dictionaryId);
-          }
-          public Binary getBinary() {
-            return dictionary.decodeToBinary(dictionaryId);
-          }
-          public float getFloat() {
-            return dictionary.decodeToFloat(dictionaryId);
-          }
-          public double getDouble() {
-            return dictionary.decodeToDouble(dictionaryId);
-          }
-        };
-  }
-
-  private void bind(PrimitiveTypeName type) {
-    binding = type.convert(new PrimitiveTypeNameConverter<Binding, RuntimeException>() {
-      @Override
-      public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          float current;
-          void read() {
-            current = dataColumn.readFloat();
-          }
-          public void skip() {
-            current = 0;
-            dataColumn.skip();
-          }
-          public float getFloat() {
-            return current;
-          }
-          void writeValue() {
-            converter.addFloat(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          double current;
-          void read() {
-            current = dataColumn.readDouble();
-          }
-          public void skip() {
-            current = 0;
-            dataColumn.skip();
-          }
-          public double getDouble() {
-            return current;
-          }
-          void writeValue() {
-            converter.addDouble(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          int current;
-          void read() {
-            current = dataColumn.readInteger();
-          }
-          public void skip() {
-            current = 0;
-            dataColumn.skip();
-          }
-          @Override
-          public int getInteger() {
-            return current;
-          }
-          void writeValue() {
-            converter.addInt(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          long current;
-          void read() {
-            current = dataColumn.readLong();
-          }
-          public void skip() {
-            current = 0;
-            dataColumn.skip();
-          }
-          @Override
-          public long getLong() {
-            return current;
-          }
-          void writeValue() {
-            converter.addLong(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return this.convertBINARY(primitiveTypeName);
-      }
-      @Override
-      public Binding convertFIXED_LEN_BYTE_ARRAY(
-          PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return this.convertBINARY(primitiveTypeName);
-      }
-      @Override
-      public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          boolean current;
-          void read() {
-            current = dataColumn.readBoolean();
-          }
-          public void skip() {
-            current = false;
-            dataColumn.skip();
-          }
-          @Override
-          public boolean getBoolean() {
-            return current;
-          }
-          void writeValue() {
-            converter.addBoolean(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          Binary current;
-          void read() {
-            current = dataColumn.readBytes();
-          }
-          public void skip() {
-            current = null;
-            dataColumn.skip();
-          }
-          @Override
-          public Binary getBinary() {
-            return current;
-          }
-          void writeValue() {
-            converter.addBinary(current);
-          }
-        };
-      }
-    });
-  }
-
-  /**
-   * creates a reader for triplets
-   * @param path the descriptor for the corresponding column
-   * @param pageReader the underlying store to read from
-   */
-  public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter) {
-    this.path = checkNotNull(path, "path");
-    this.pageReader = checkNotNull(pageReader, "pageReader");
-    this.converter = checkNotNull(converter, "converter");
-    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
-    if (dictionaryPage != null) {
-      try {
-        this.dictionary = dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
-        if (converter.hasDictionarySupport()) {
-          converter.setDictionary(dictionary);
-        }
-      } catch (IOException e) {
-        throw new ParquetDecodingException("could not decode the dictionary for " + path, e);
-      }
-    } else {
-      this.dictionary = null;
-    }
-    this.totalValueCount = pageReader.getTotalValueCount();
-    if (totalValueCount == 0) {
-      throw new ParquetDecodingException("totalValueCount == 0");
-    }
-    consume();
-  }
-
-  private boolean isFullyConsumed() {
-    return readValues >= totalValueCount;
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#writeCurrentValueToConverter()
-   */
-  @Override
-  public void writeCurrentValueToConverter() {
-    readValue();
-    this.binding.writeValue();
-  }
-
-  @Override
-  public int getCurrentValueDictionaryID() {
-    readValue();
-    return binding.getDictionaryId();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getInteger()
-   */
-  @Override
-  public int getInteger() {
-    readValue();
-    return this.binding.getInteger();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getBoolean()
-   */
-  @Override
-  public boolean getBoolean() {
-    readValue();
-    return this.binding.getBoolean();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getLong()
-   */
-  @Override
-  public long getLong() {
-    readValue();
-    return this.binding.getLong();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getBinary()
-   */
-  @Override
-  public Binary getBinary() {
-    readValue();
-    return this.binding.getBinary();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getFloat()
-   */
-  @Override
-  public float getFloat() {
-    readValue();
-    return this.binding.getFloat();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getDouble()
-   */
-  @Override
-  public double getDouble() {
-    readValue();
-    return this.binding.getDouble();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getCurrentRepetitionLevel()
-   */
-  @Override
-  public int getCurrentRepetitionLevel() {
-    return repetitionLevel;
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getDescriptor()
-   */
-  @Override
-  public ColumnDescriptor getDescriptor() {
-    return path;
-  }
-
-  /**
-   * Reads the value into the binding.
-   */
-  public void readValue() {
-    try {
-      if (!valueRead) {
-        binding.read();
-        valueRead = true;
-      }
-    } catch (RuntimeException e) {
-      throw new ParquetDecodingException(
-          format(
-              "Can't read value in column %s at value %d out of %d, %d out of %d in currentPage. repetition level: %d, definition level: %d",
-              path, readValues, totalValueCount, readValues - (endOfPageValueCount - pageValueCount), pageValueCount, repetitionLevel, definitionLevel),
-          e);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#skip()
-   */
-  @Override
-  public void skip() {
-    if (!valueRead) {
-      binding.skip();
-      valueRead = true;
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getCurrentDefinitionLevel()
-   */
-  @Override
-  public int getCurrentDefinitionLevel() {
-    return definitionLevel;
-  }
-
-  // TODO: change the logic around read() to not tie together reading from the 3 columns
-  private void readRepetitionAndDefinitionLevels() {
-    repetitionLevel = repetitionLevelColumn.nextInt();
-    definitionLevel = definitionLevelColumn.nextInt();
-    ++readValues;
-  }
-
-  private void checkRead() {
-    if (isPageFullyConsumed()) {
-      if (isFullyConsumed()) {
-        if (DEBUG) LOG.debug("end reached");
-        repetitionLevel = 0; // the next repetition level
-        return;
-      }
-      readPage();
-    }
-    readRepetitionAndDefinitionLevels();
-  }
-
-  private void readPage() {
-    if (DEBUG) LOG.debug("loading page");
-    DataPage page = pageReader.readPage();
-    page.accept(new DataPage.Visitor<Void>() {
-      @Override
-      public Void visit(DataPageV1 dataPageV1) {
-        readPageV1(dataPageV1);
-        return null;
-      }
-      @Override
-      public Void visit(DataPageV2 dataPageV2) {
-        readPageV2(dataPageV2);
-        return null;
-      }
-    });
-  }
-
-  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
-    this.pageValueCount = valueCount;
-    this.endOfPageValueCount = readValues + pageValueCount;
-    if (dataEncoding.usesDictionary()) {
-      if (dictionary == null) {
-        throw new ParquetDecodingException(
-            "could not read page in col " + path + " as the dictionary was missing for encoding " + dataEncoding);
-      }
-      this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(path, VALUES, dictionary);
-    } else {
-      this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
-    }
-    if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
-      bindToDictionary(dictionary);
-    } else {
-      bind(path.getType());
-    }
-    try {
-      dataColumn.initFromPage(pageValueCount, bytes, offset);
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page in col " + path, e);
-    }
-  }
-
-  private void readPageV1(DataPageV1 page) {
-    ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
-    ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
-    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
-    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
-    try {
-      byte[] bytes = page.getBytes().toByteArray();
-      if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
-      if (DEBUG) LOG.debug("reading repetition levels at 0");
-      rlReader.initFromPage(pageValueCount, bytes, 0);
-      int next = rlReader.getNextOffset();
-      if (DEBUG) LOG.debug("reading definition levels at " + next);
-      dlReader.initFromPage(pageValueCount, bytes, next);
-      next = dlReader.getNextOffset();
-      if (DEBUG) LOG.debug("reading data at " + next);
-      initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
-    }
-  }
-
-  private void readPageV2(DataPageV2 page) {
-    this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
-    this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
-    try {
-      if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
-      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
-    }
-  }
-
-  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
-    try {
-      if (maxLevel == 0) {
-        return new NullIntIterator();
-      }
-      return new RLEIntIterator(
-          new RunLengthBitPackingHybridDecoder(
-              BytesUtils.getWidthFromMaxInt(maxLevel),
-              new ByteArrayInputStream(bytes.toByteArray())));
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read levels in page for col " + path, e);
-    }
-  }
-
-  private boolean isPageFullyConsumed() {
-    return readValues >= endOfPageValueCount;
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#consume()
-   */
-  @Override
-  public void consume() {
-    checkRead();
-    valueRead = false;
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see parquet.column.ColumnReader#getTotalValueCount()
-   */
-  @Override
-  public long getTotalValueCount() {
-    return totalValueCount;
-  }
-
-  static abstract class IntIterator {
-    abstract int nextInt();
-  }
-
-  static class ValuesReaderIntIterator extends IntIterator {
-    ValuesReader delegate;
-
-    public ValuesReaderIntIterator(ValuesReader delegate) {
-      super();
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      return delegate.readInteger();
-    }
-  }
-
-  static class RLEIntIterator extends IntIterator {
-    RunLengthBitPackingHybridDecoder delegate;
-
-    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      try {
-        return delegate.readInt();
-      } catch (IOException e) {
-        throw new ParquetDecodingException(e);
-      }
-    }
-  }
-
-  private static final class NullIntIterator extends IntIterator {
-    @Override
-    int nextInt() {
-      return 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
deleted file mode 100644
index 2c476ca..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriteStore;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-
-public class ColumnWriteStoreV1 implements ColumnWriteStore {
-
-  private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
-  private final PageWriteStore pageWriteStore;
-  private final int pageSizeThreshold;
-  private final int dictionaryPageSizeThreshold;
-  private final boolean enableDictionary;
-  private final WriterVersion writerVersion;
-
-  public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
-    super();
-    this.pageWriteStore = pageWriteStore;
-    this.pageSizeThreshold = pageSizeThreshold;
-    this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
-    this.enableDictionary = enableDictionary;
-    this.writerVersion = writerVersion;
-  }
-
-  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
-    ColumnWriterV1 column = columns.get(path);
-    if (column == null) {
-      column = newMemColumn(path);
-      columns.put(path, column);
-    }
-    return column;
-  }
-
-  public Set<ColumnDescriptor> getColumnDescriptors() {
-    return columns.keySet();
-  }
-
-  private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
-    PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-    return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
-  }
-
-  @Override
-  public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
-        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
-        sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
-        sb.append("\n");
-      }
-      return sb.toString();
-  }
-
-  @Override
-  public long getAllocatedSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long total = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      total += memColumn.allocatedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public long getBufferedSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long total = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      total += memColumn.getBufferedSizeInMemory();
-    }
-    return total;
-  }
-
-  @Override
-  public String memUsageString() {
-    StringBuilder b = new StringBuilder("Store {\n");
-    Collection<ColumnWriterV1> values = columns.values();
-    for (ColumnWriterV1 memColumn : values) {
-      b.append(memColumn.memUsageString(" "));
-    }
-    b.append("}\n");
-    return b.toString();
-  }
-
-  public long maxColMemSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long max = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      max = Math.max(max, memColumn.getBufferedSizeInMemory());
-    }
-    return max;
-  }
-
-  @Override
-  public void flush() {
-    Collection<ColumnWriterV1> values = columns.values();
-    for (ColumnWriterV1 memColumn : values) {
-      memColumn.flush();
-    }
-  }
-
-  @Override
-  public void endRecord() {
-    // V1 does not take record boundaries into account
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
deleted file mode 100644
index 2dc342e..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.util.Collections.unmodifiableMap;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriteStore;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-import parquet.schema.MessageType;
-
-public class ColumnWriteStoreV2 implements ColumnWriteStore {
-
-  // will wait for at least that many records before checking again
-  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
-  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-  // will flush even if size bellow the threshold by this much to facilitate page alignment
-  private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
-
-  private final Map<ColumnDescriptor, ColumnWriterV2> columns;
-  private final Collection<ColumnWriterV2> writers;
-  private long rowCount;
-  private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
-  private final long thresholdTolerance;
-
-  private int pageSizeThreshold;
-
-  public ColumnWriteStoreV2(
-      MessageType schema,
-      PageWriteStore pageWriteStore,
-      int pageSizeThreshold,
-      ParquetProperties parquetProps) {
-    super();
-    this.pageSizeThreshold = pageSizeThreshold;
-    this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
-    Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
-    for (ColumnDescriptor path : schema.getColumns()) {
-      PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-      mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
-    }
-    this.columns = unmodifiableMap(mcolumns);
-    this.writers = this.columns.values();
-  }
-
-  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
-    return columns.get(path);
-  }
-
-  public Set<ColumnDescriptor> getColumnDescriptors() {
-    return columns.keySet();
-  }
-
-  @Override
-  public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
-        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
-        sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
-        sb.append("\n");
-      }
-      return sb.toString();
-  }
-
-  @Override
-  public long getAllocatedSize() {
-    long total = 0;
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      total += memColumn.allocatedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public long getBufferedSize() {
-    long total = 0;
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      total += memColumn.getTotalBufferedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public void flush() {
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      long rows = rowCount - memColumn.getRowsWrittenSoFar();
-      if (rows > 0) {
-        memColumn.writePage(rowCount);
-      }
-      memColumn.finalizeColumnChunk();
-    }
-  }
-
-  public String memUsageString() {
-    StringBuilder b = new StringBuilder("Store {\n");
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      b.append(memColumn.memUsageString(" "));
-    }
-    b.append("}\n");
-    return b.toString();
-  }
-
-  @Override
-  public void endRecord() {
-    ++ rowCount;
-    if (rowCount >= rowCountForNextSizeCheck) {
-      sizeCheck();
-    }
-  }
-
-  private void sizeCheck() {
-    long minRecordToWait = Long.MAX_VALUE;
-    for (ColumnWriterV2 writer : writers) {
-      long usedMem = writer.getCurrentPageBufferedSize();
-      long rows = rowCount - writer.getRowsWrittenSoFar();
-      long remainingMem = pageSizeThreshold - usedMem;
-      if (remainingMem <= thresholdTolerance) {
-        writer.writePage(rowCount);
-        remainingMem = pageSizeThreshold;
-      }
-      long rowsToFillPage =
-          usedMem == 0 ?
-              MAXIMUM_RECORD_COUNT_FOR_CHECK
-              : (long)((float)rows) / usedMem * remainingMem;
-      if (rowsToFillPage < minRecordToWait) {
-        minRecordToWait = rowsToFillPage;
-      }
-    }
-    if (minRecordToWait == Long.MAX_VALUE) {
-      minRecordToWait = MINIMUM_RECORD_COUNT_FOR_CHECK;
-    }
-    // will check again halfway
-    rowCountForNextSizeCheck = rowCount +
-        min(
-            max(minRecordToWait / 2, MINIMUM_RECORD_COUNT_FOR_CHECK), // no less than MINIMUM_RECORD_COUNT_FOR_CHECK
-            MAXIMUM_RECORD_COUNT_FOR_CHECK); // no more than MAXIMUM_RECORD_COUNT_FOR_CHECK
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
deleted file mode 100644
index ce4af80..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static parquet.bytes.BytesInput.concat;
-
-import java.io.IOException;
-
-import parquet.Log;
-import parquet.bytes.CapacityByteArrayOutputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.column.values.ValuesWriter;
-import parquet.io.ParquetEncodingException;
-import parquet.io.api.Binary;
-
-import static java.lang.Math.max;
-import static java.lang.Math.pow;
-
-/**
- * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
- *
- * @author Julien Le Dem
- *
- */
-final class ColumnWriterV1 implements ColumnWriter {
-  private static final Log LOG = Log.getLog(ColumnWriterV1.class);
-  private static final boolean DEBUG = Log.DEBUG;
-  private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
-  private static final int MIN_SLAB_SIZE = 64;
-
-  private final ColumnDescriptor path;
-  private final PageWriter pageWriter;
-  private final long pageSizeThreshold;
-  private ValuesWriter repetitionLevelColumn;
-  private ValuesWriter definitionLevelColumn;
-  private ValuesWriter dataColumn;
-  private int valueCount;
-  private int valueCountForNextSizeCheck;
-
-  private Statistics statistics;
-
-  public ColumnWriterV1(
-      ColumnDescriptor path,
-      PageWriter pageWriter,
-      int pageSizeThreshold,
-      int dictionaryPageSizeThreshold,
-      boolean enableDictionary,
-      WriterVersion writerVersion) {
-    this.path = path;
-    this.pageWriter = pageWriter;
-    this.pageSizeThreshold = pageSizeThreshold;
-    // initial check of memory usage. So that we have enough data to make an initial prediction
-    this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
-    resetStatistics();
-
-    ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
-
-    this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
-    this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
-
-    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
-    this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
-  }
-
-  private void log(Object value, int r, int d) {
-    LOG.debug(path + " " + value + " r:" + r + " d:" + d);
-  }
-
-  private void resetStatistics() {
-    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
-  }
-
-  /**
-   * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
-   *
-   * We measure the memory used when we reach the mid point toward our estimated count.
-   * We then update the estimate and flush the page if we reached the threshold.
-   *
-   * That way we check the memory size log2(n) times.
-   *
-   */
-  private void accountForValueWritten() {
-    ++ valueCount;
-    if (valueCount > valueCountForNextSizeCheck) {
-      // not checking the memory used for every value
-      long memSize = repetitionLevelColumn.getBufferedSize()
-          + definitionLevelColumn.getBufferedSize()
-          + dataColumn.getBufferedSize();
-      if (memSize > pageSizeThreshold) {
-        // we will write the current page and check again the size at the predicted middle of next page
-        valueCountForNextSizeCheck = valueCount / 2;
-        writePage();
-      } else {
-        // not reached the threshold, will check again midway
-        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
-      }
-    }
-  }
-
-  private void updateStatisticsNumNulls() {
-    statistics.incrementNumNulls();
-  }
-
-  private void updateStatistics(int value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(long value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(float value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(double value) {
-   statistics.updateStats(value);
-  }
-
-  private void updateStatistics(Binary value) {
-   statistics.updateStats(value);
-  }
-
-  private void updateStatistics(boolean value) {
-   statistics.updateStats(value);
-  }
-
-  private void writePage() {
-    if (DEBUG) LOG.debug("write page");
-    try {
-      pageWriter.writePage(
-          concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
-          valueCount,
-          statistics,
-          repetitionLevelColumn.getEncoding(),
-          definitionLevelColumn.getEncoding(),
-          dataColumn.getEncoding());
-    } catch (IOException e) {
-      throw new ParquetEncodingException("could not write page for " + path, e);
-    }
-    repetitionLevelColumn.reset();
-    definitionLevelColumn.reset();
-    dataColumn.reset();
-    valueCount = 0;
-    resetStatistics();
-  }
-
-  @Override
-  public void writeNull(int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(null, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    updateStatisticsNumNulls();
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(double value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeDouble(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(float value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeFloat(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(Binary value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeBytes(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(boolean value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeBoolean(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(int value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeInteger(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(long value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeLong(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  public void flush() {
-    if (valueCount > 0) {
-      writePage();
-    }
-    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
-    if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
-      try {
-        pageWriter.writeDictionaryPage(dictionaryPage);
-      } catch (IOException e) {
-        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
-      }
-      dataColumn.resetDictionary();
-    }
-  }
-
-  public long getBufferedSizeInMemory() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
-  }
-
-  public long allocatedSize() {
-    return repetitionLevelColumn.getAllocatedSize()
-    + definitionLevelColumn.getAllocatedSize()
-    + dataColumn.getAllocatedSize()
-    + pageWriter.allocatedSize();
-  }
-
-  public String memUsageString(String indent) {
-    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
-    b.append(repetitionLevelColumn.memUsageString(indent + "  r:")).append("\n");
-    b.append(definitionLevelColumn.memUsageString(indent + "  d:")).append("\n");
-    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
-    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
-    b.append(indent).append(String.format("  total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
-    b.append(indent).append("}\n");
-    return b.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
deleted file mode 100644
index e7b8c1c..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static java.lang.Math.max;
-import static java.lang.Math.pow;
-import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
-
-import java.io.IOException;
-
-import parquet.Ints;
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.CapacityByteArrayOutputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriter;
-import parquet.column.Encoding;
-import parquet.column.ParquetProperties;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.column.values.ValuesWriter;
-import parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
-import parquet.io.ParquetEncodingException;
-import parquet.io.api.Binary;
-
-/**
- * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
- *
- * @author Julien Le Dem
- *
- */
-final class ColumnWriterV2 implements ColumnWriter {
-  private static final Log LOG = Log.getLog(ColumnWriterV2.class);
-  private static final boolean DEBUG = Log.DEBUG;
-  private static final int MIN_SLAB_SIZE = 64;
-
-  private final ColumnDescriptor path;
-  private final PageWriter pageWriter;
-  private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
-  private RunLengthBitPackingHybridEncoder definitionLevelColumn;
-  private ValuesWriter dataColumn;
-  private int valueCount;
-
-  private Statistics<?> statistics;
-  private long rowsWrittenSoFar = 0;
-
-  public ColumnWriterV2(
-      ColumnDescriptor path,
-      PageWriter pageWriter,
-      ParquetProperties parquetProps,
-      int pageSize) {
-    this.path = path;
-    this.pageWriter = pageWriter;
-    resetStatistics();
-
-    this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
-    this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);
-
-    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
-    this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
-  }
-
-  private void log(Object value, int r, int d) {
-    LOG.debug(path + " " + value + " r:" + r + " d:" + d);
-  }
-
-  private void resetStatistics() {
-    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
-  }
-
-  private void definitionLevel(int definitionLevel) {
-    try {
-      definitionLevelColumn.writeInt(definitionLevel);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
-    }
-  }
-
-  private void repetitionLevel(int repetitionLevel) {
-    try {
-      repetitionLevelColumn.writeInt(repetitionLevel);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
-    }
-  }
-
-  /**
-   * writes the current null value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void writeNull(int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(null, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    statistics.incrementNumNulls();
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(double value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeDouble(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(float value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeFloat(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(Binary value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeBytes(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(boolean value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeBoolean(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(int value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeInteger(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(long value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeLong(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
-   * Is called right after writePage
-   */
-  public void finalizeColumnChunk() {
-    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
-    if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
-      try {
-        pageWriter.writeDictionaryPage(dictionaryPage);
-      } catch (IOException e) {
-        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
-      }
-      dataColumn.resetDictionary();
-    }
-  }
-
-  /**
-   * used to decide when to write a page
-   * @return the number of bytes of memory used to buffer the current data
-   */
-  public long getCurrentPageBufferedSize() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize();
-  }
-
-  /**
-   * used to decide when to write a page or row group
-   * @return the number of bytes of memory used to buffer the current data and the previously written pages
-   */
-  public long getTotalBufferedSize() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
-  }
-
-  /**
-   * @return actual memory used
-   */
-  public long allocatedSize() {
-    return repetitionLevelColumn.getAllocatedSize()
-    + definitionLevelColumn.getAllocatedSize()
-    + dataColumn.getAllocatedSize()
-    + pageWriter.allocatedSize();
-  }
-
-  /**
-   * @param prefix a prefix to format lines
-   * @return a formatted string showing how memory is used
-   */
-  public String memUsageString(String indent) {
-    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
-    b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
-    b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
-    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
-    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
-    b.append(indent).append(String.format("  total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
-    b.append(indent).append("}\n");
-    return b.toString();
-  }
-
-  public long getRowsWrittenSoFar() {
-    return this.rowsWrittenSoFar;
-  }
-
-  /**
-   * writes the current data to a new page in the page store
-   * @param rowCount how many rows have been written so far
-   */
-  public void writePage(long rowCount) {
-    int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
-    this.rowsWrittenSoFar = rowCount;
-    if (DEBUG) LOG.debug("write page");
-    try {
-      // TODO: rework this API. Those must be called *in that order*
-      BytesInput bytes = dataColumn.getBytes();
-      Encoding encoding = dataColumn.getEncoding();
-      pageWriter.writePageV2(
-          pageRowCount,
-          Ints.checkedCast(statistics.getNumNulls()),
-          valueCount,
-          path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
-          path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
-          encoding,
-          bytes,
-          statistics
-          );
-    } catch (IOException e) {
-      throw new ParquetEncodingException("could not write page for " + path, e);
-    }
-    repetitionLevelColumn.reset();
-    definitionLevelColumn.reset();
-    dataColumn.reset();
-    valueCount = 0;
-    resetStatistics();
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/page/DataPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPage.java b/parquet-column/src/main/java/parquet/column/page/DataPage.java
deleted file mode 100644
index 8043fd0..0000000
--- a/parquet-column/src/main/java/parquet/column/page/DataPage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.page;
-
-/**
- * one data page in a chunk
- *
- * @author Julien Le Dem
- *
- */
-abstract public class DataPage extends Page {
-
-  private final int valueCount;
-
-  DataPage(int compressedSize, int uncompressedSize, int valueCount) {
-    super(compressedSize, uncompressedSize);
-    this.valueCount = valueCount;
-  }
-
-  /**
-   * @return the number of values in that page
-   */
-  public int getValueCount() {
-    return valueCount;
-  }
-
-  public abstract <T> T accept(Visitor<T> visitor);
-
-  public static interface Visitor<T> {
-
-    T visit(DataPageV1 dataPageV1);
-
-    T visit(DataPageV2 dataPageV2);
-
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
deleted file mode 100644
index 867bb4a..0000000
--- a/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.page;
-
-import parquet.Ints;
-import parquet.bytes.BytesInput;
-import parquet.column.Encoding;
-import parquet.column.statistics.Statistics;
-
-public class DataPageV1 extends DataPage {
-
-  private final BytesInput bytes;
-  private final Statistics<?> statistics;
-  private final Encoding rlEncoding;
-  private final Encoding dlEncoding;
-  private final Encoding valuesEncoding;
-
-  /**
-   * @param bytes the bytes for this page
-   * @param valueCount count of values in this page
-   * @param uncompressedSize the uncompressed size of the page
-   * @param statistics of the page's values (max, min, num_null)
-   * @param rlEncoding the repetition level encoding for this page
-   * @param dlEncoding the definition level encoding for this page
-   * @param valuesEncoding the values encoding for this page
-   * @param dlEncoding
-   */
-  public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statistics<?> stats, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
-    super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount);
-    this.bytes = bytes;
-    this.statistics = stats;
-    this.rlEncoding = rlEncoding;
-    this.dlEncoding = dlEncoding;
-    this.valuesEncoding = valuesEncoding;
-  }
-
-  /**
-   * @return the bytes for the page
-   */
-  public BytesInput getBytes() {
-    return bytes;
-  }
-
-  /**
-   *
-   * @return the statistics for this page (max, min, num_nulls)
-   */
-  public Statistics<?> getStatistics() {
-    return statistics;
-  }
-
-  /**
-   * @return the definition level encoding for this page
-   */
-  public Encoding getDlEncoding() {
-    return dlEncoding;
-  }
-
-  /**
-   * @return the repetition level encoding for this page
-   */
-  public Encoding getRlEncoding() {
-    return rlEncoding;
-  }
-
-  /**
-   * @return the values encoding for this page
-   */
-  public Encoding getValueEncoding() {
-    return valuesEncoding;
-  }
-
-  @Override
-  public String toString() {
-    return "Page [bytes.size=" + bytes.size() + ", valueCount=" + getValueCount() + ", uncompressedSize=" + getUncompressedSize() + "]";
-  }
-
-  @Override
-  public <T> T accept(Visitor<T> visitor) {
-    return visitor.visit(this);
-  }
-}