You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:39 UTC

[42/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
new file mode 100644
index 0000000..9d94439
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
@@ -0,0 +1,108 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class DoubleStatistics extends Statistics<Double> {
+
+  private double max;
+  private double min;
+
+  @Override
+  public void updateStats(double value) {
+    if (!this.hasNonNullValue()) {
+      initializeStats(value, value);
+    } else {
+      updateStats(value, value);
+    }
+  }
+
+  @Override
+  public void mergeStatisticsMinMax(Statistics stats) {
+    DoubleStatistics doubleStats = (DoubleStatistics)stats;
+    if (!this.hasNonNullValue()) {
+      initializeStats(doubleStats.getMin(), doubleStats.getMax());
+    } else {
+      updateStats(doubleStats.getMin(), doubleStats.getMax());
+    }
+  }
+
+  @Override
+  public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+    max = Double.longBitsToDouble(BytesUtils.bytesToLong(maxBytes));
+    min = Double.longBitsToDouble(BytesUtils.bytesToLong(minBytes));
+    this.markAsNotEmpty();
+  }
+
+  @Override
+  public byte[] getMaxBytes() {
+    return BytesUtils.longToBytes(Double.doubleToLongBits(max));
+  }
+
+  @Override
+  public byte[] getMinBytes() {
+    return BytesUtils.longToBytes(Double.doubleToLongBits(min));
+  }
+
+  @Override
+  public String toString() {
+    if(this.hasNonNullValue())
+      return String.format("min: %.5f, max: %.5f, num_nulls: %d", min, max, this.getNumNulls());
+    else if (!this.isEmpty())
+      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+    else
+      return "no stats for this column";
+  }
+
+  public void updateStats(double min_value, double max_value) {
+    if (min_value < min) { min = min_value; }
+    if (max_value > max) { max = max_value; }
+  }
+
+  public void initializeStats(double min_value, double max_value) {
+      min = min_value;
+      max = max_value;
+      this.markAsNotEmpty();
+  }
+
+  @Override
+  public Double genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Double genericGetMax() {
+    return max;
+  }
+
+  public double getMax() {
+    return max;
+  }
+
+  public double getMin() {
+    return min;
+  }
+
+  public void setMinMax(double min, double max) {
+    this.max = max;
+    this.min = min;
+    this.markAsNotEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
new file mode 100644
index 0000000..c164cf5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
@@ -0,0 +1,108 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class FloatStatistics extends Statistics<Float> {
+
+  private float max;
+  private float min;
+
+  @Override
+  public void updateStats(float value) {
+    if (!this.hasNonNullValue()) {
+      initializeStats(value, value);
+    } else {
+      updateStats(value, value);
+    }
+  }
+
+  @Override
+  public void mergeStatisticsMinMax(Statistics stats) {
+    FloatStatistics floatStats = (FloatStatistics)stats;
+    if (!this.hasNonNullValue()) {
+      initializeStats(floatStats.getMin(), floatStats.getMax());
+    } else {
+      updateStats(floatStats.getMin(), floatStats.getMax());
+    }
+  }
+
+  @Override
+  public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+    max = Float.intBitsToFloat(BytesUtils.bytesToInt(maxBytes));
+    min = Float.intBitsToFloat(BytesUtils.bytesToInt(minBytes));
+    this.markAsNotEmpty();
+  }
+
+  @Override
+  public byte[] getMaxBytes() {
+    return BytesUtils.intToBytes(Float.floatToIntBits(max));
+  }
+
+  @Override
+  public byte[] getMinBytes() {
+    return BytesUtils.intToBytes(Float.floatToIntBits(min));
+  }
+
+  @Override
+  public String toString() {
+    if (this.hasNonNullValue())
+      return String.format("min: %.5f, max: %.5f, num_nulls: %d", min, max, this.getNumNulls());
+    else if (!this.isEmpty())
+      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+    else
+      return "no stats for this column";
+  }
+
+  public void updateStats(float min_value, float max_value) {
+    if (min_value < min) { min = min_value; }
+    if (max_value > max) { max = max_value; }
+  }
+
+  public void initializeStats(float min_value, float max_value) {
+      min = min_value;
+      max = max_value;
+      this.markAsNotEmpty();
+  }
+
+  @Override
+  public Float genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Float genericGetMax() {
+    return max;
+  }
+
+  public float getMax() {
+    return max;
+  }
+
+  public float getMin() {
+    return min;
+  }
+
+  public void setMinMax(float min, float max) {
+    this.max = max;
+    this.min = min;
+    this.markAsNotEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
new file mode 100644
index 0000000..8deb28a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
@@ -0,0 +1,108 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class IntStatistics extends Statistics<Integer> {
+
+  private int max;
+  private int min;
+
+  @Override
+  public void updateStats(int value) {
+    if (!this.hasNonNullValue()) {
+      initializeStats(value, value);
+    } else {
+      updateStats(value, value);
+    }
+  }
+
+  @Override
+  public void mergeStatisticsMinMax(Statistics stats) {
+    IntStatistics intStats = (IntStatistics)stats;
+    if (!this.hasNonNullValue()) {
+      initializeStats(intStats.getMin(), intStats.getMax());
+    } else {
+      updateStats(intStats.getMin(), intStats.getMax());
+    }
+  }
+
+  @Override
+  public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+    max = BytesUtils.bytesToInt(maxBytes);
+    min = BytesUtils.bytesToInt(minBytes);
+    this.markAsNotEmpty();
+  }
+
+  @Override
+  public byte[] getMaxBytes() {
+    return BytesUtils.intToBytes(max);
+  }
+
+  @Override
+  public byte[] getMinBytes() {
+    return BytesUtils.intToBytes(min);
+  }
+
+  @Override
+  public String toString() {
+    if (this.hasNonNullValue())
+      return String.format("min: %d, max: %d, num_nulls: %d", min, max, this.getNumNulls());
+    else if (!this.isEmpty())
+      return String.format("num_nulls: %d, min/max is not defined", this.getNumNulls());
+    else
+      return "no stats for this column";
+  }
+
+  public void updateStats(int min_value, int max_value) {
+    if (min_value < min) { min = min_value; }
+    if (max_value > max) { max = max_value; }
+  }
+
+  public void initializeStats(int min_value, int max_value) {
+      min = min_value;
+      max = max_value;
+      this.markAsNotEmpty();
+  }
+
+  @Override
+  public Integer genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Integer genericGetMax() {
+    return max;
+  }
+
+  public int getMax() {
+    return max;
+  }
+
+  public int getMin() {
+    return min;
+  }
+
+  public void setMinMax(int min, int max) {
+    this.max = max;
+    this.min = min;
+    this.markAsNotEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
new file mode 100644
index 0000000..a8c177e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
@@ -0,0 +1,108 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class LongStatistics extends Statistics<Long> {
+
+  private long max;
+  private long min;
+
+  @Override
+  public void updateStats(long value) {
+    if (!this.hasNonNullValue()) {
+      initializeStats(value, value);
+    } else {
+      updateStats(value, value);
+    }
+  }
+
+  @Override
+  public void mergeStatisticsMinMax(Statistics stats) {
+    LongStatistics longStats = (LongStatistics)stats;
+    if (!this.hasNonNullValue()) {
+      initializeStats(longStats.getMin(), longStats.getMax());
+    } else {
+      updateStats(longStats.getMin(), longStats.getMax());
+    }
+  }
+
+  @Override
+  public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+    max = BytesUtils.bytesToLong(maxBytes);
+    min = BytesUtils.bytesToLong(minBytes);
+    this.markAsNotEmpty();
+  }
+
+  @Override
+  public byte[] getMaxBytes() {
+    return BytesUtils.longToBytes(max);
+  }
+
+  @Override
+  public byte[] getMinBytes() {
+    return BytesUtils.longToBytes(min);
+  }
+
+  @Override
+  public String toString() {
+    if (this.hasNonNullValue())
+      return String.format("min: %d, max: %d, num_nulls: %d", min, max, this.getNumNulls());
+    else if (!this.isEmpty())
+      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+    else
+      return "no stats for this column";
+  }
+
+  public void updateStats(long min_value, long max_value) {
+    if (min_value < min) { min = min_value; }
+    if (max_value > max) { max = max_value; }
+  }
+
+  public void initializeStats(long min_value, long max_value) {
+      min = min_value;
+      max = max_value;
+      this.markAsNotEmpty();
+  }
+
+  @Override
+  public Long genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Long genericGetMax() {
+    return max;
+  }
+
+  public long getMax() {
+    return max;
+  }
+
+  public long getMin() {
+    return min;
+  }
+
+  public void setMinMax(long min, long max) {
+    this.max = max;
+    this.min = min;
+    this.markAsNotEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
new file mode 100644
index 0000000..ba135f5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
@@ -0,0 +1,247 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.column.UnknownColumnTypeException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import java.util.Arrays;
+
+
+/**
+ * Statistics class to keep track of statistics in parquet pages and column chunks
+ *
+ * @author Katya Gonina
+ */
+public abstract class Statistics<T extends Comparable<T>> {
+
+  private boolean hasNonNullValue;
+  private long num_nulls;
+
+  public Statistics() {
+    hasNonNullValue = false;
+    num_nulls = 0;
+  }
+
+  /**
+   * Returns the typed statistics object based on the passed type parameter
+   * @param type PrimitiveTypeName type of the column
+   * @return instance of a typed statistics class
+   */
+  public static Statistics getStatsBasedOnType(PrimitiveTypeName type) {
+    switch(type) {
+    case INT32:
+      return new IntStatistics();
+    case INT64:
+      return new LongStatistics();
+    case FLOAT:
+      return new FloatStatistics();
+    case DOUBLE:
+      return new DoubleStatistics();
+    case BOOLEAN:
+      return new BooleanStatistics();
+    case BINARY:
+      return new BinaryStatistics();
+    case INT96:
+      return new BinaryStatistics();
+    case FIXED_LEN_BYTE_ARRAY:
+      return new BinaryStatistics();
+    default:
+      throw new UnknownColumnTypeException(type);
+    }
+  }
+
+  /**
+   * updates statistics min and max using the passed value
+   * @param value value to use to update min and max
+   */
+  public void updateStats(int value) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * updates statistics min and max using the passed value
+   * @param value value to use to update min and max
+   */
+  public void updateStats(long value) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * updates statistics min and max using the passed value
+   * @param value value to use to update min and max
+   */
+  public void updateStats(float value) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * updates statistics min and max using the passed value
+   * @param value value to use to update min and max
+   */
+  public void updateStats(double value) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * updates statistics min and max using the passed value
+   * @param value value to use to update min and max
+   */
+  public void updateStats(boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * updates statistics min and max using the passed value
+   * @param value value to use to update min and max
+   */
+  public void updateStats(Binary value) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Equality comparison method to compare two statistics objects.
+   * @param stats Statistics object to compare against
+   * @return true if objects are equal, false otherwise
+   */
+  public boolean equals(Statistics stats) {
+    return Arrays.equals(stats.getMaxBytes(), this.getMaxBytes()) &&
+           Arrays.equals(stats.getMinBytes(), this.getMinBytes()) &&
+           stats.getNumNulls() == this.getNumNulls();
+  }
+
+  /**
+   * Hash code for the statistics object
+   * @return hash code int
+   */
+  public int hashCode() {
+    return 31 * Arrays.hashCode(getMaxBytes()) + 17 * Arrays.hashCode(getMinBytes()) + Long.valueOf(this.getNumNulls()).hashCode();
+  }
+
+  /**
+   * Method to merge this statistics object with the object passed
+   * as parameter. Merging keeps the smallest of min values, largest of max
+   * values and combines the number of null counts.
+   * @param stats Statistics object to merge with
+   */
+  public void mergeStatistics(Statistics stats) {
+    if (stats.isEmpty()) return;
+
+    if (this.getClass() == stats.getClass()) {
+      incrementNumNulls(stats.getNumNulls());
+      if (stats.hasNonNullValue()) {
+        mergeStatisticsMinMax(stats);
+        markAsNotEmpty();
+      }
+    } else {
+      throw new StatisticsClassException(this.getClass().toString(), stats.getClass().toString());
+    }
+  }
+
+  /**
+   * Abstract method to merge this statistics min and max with the values
+   * of the parameter object. Does not do any checks, only called internally.
+   * @param stats Statistics object to merge with
+   */
+  abstract protected void mergeStatisticsMinMax(Statistics stats);
+
+  /**
+   * Abstract method to set min and max values from byte arrays.
+   * @param minBytes byte array to set the min value to
+   * @param maxBytes byte array to set the max value to
+   */
+  abstract public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
+
+  abstract public T genericGetMin();
+  abstract public T genericGetMax();
+
+  /**
+   * Abstract method to return the max value as a byte array
+   * @return byte array corresponding to the max value
+   */
+  abstract public byte[] getMaxBytes();
+
+  /**
+   * Abstract method to return the min value as a byte array
+   * @return byte array corresponding to the min value
+   */
+  abstract public byte[] getMinBytes();
+
+  /**
+   * toString() to display min, max, num_nulls in a string
+   */
+  abstract public String toString();
+
+
+  /**
+   * Increments the null count by one
+   */
+  public void incrementNumNulls() {
+    num_nulls++ ;
+  }
+
+  /**
+   * Increments the null count by the parameter value
+   * @param increment value to increment the null count by
+   */
+  public void incrementNumNulls(long increment) {
+    num_nulls += increment ;
+  }
+
+  /**
+   * Returns the null count
+   * @return null count
+   */
+  public long getNumNulls() {
+    return num_nulls;
+  }
+
+  /**
+   * Sets the number of nulls to the parameter value
+   * @param nulls null count to set the count to
+   */
+  public void setNumNulls(long nulls) {
+    num_nulls = nulls;
+  }
+
+  /**
+   * Returns a boolean specifying if the Statistics object is empty,
+   * i.e does not contain valid statistics for the page/column yet
+   * @return true if object is empty, false otherwise
+   */
+  public boolean isEmpty() {
+    return !hasNonNullValue && num_nulls == 0;
+  }
+
+  /**
+   * Returns whether there have been non-null values added to this statistics
+   */
+  public boolean hasNonNullValue() {
+    return hasNonNullValue;
+  }
+ 
+  /**
+   * Sets the page/column as having a valid non-null value
+   * kind of misnomer here
+   */ 
+  protected void markAsNotEmpty() {
+    hasNonNullValue = true;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
new file mode 100644
index 0000000..a242737
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
@@ -0,0 +1,34 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * Thrown if the two Statistics objects have mismatching types
+ *
+ * @author  Katya Gonina
+ */
+public class StatisticsClassException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public StatisticsClassException(String className1, String className2) {
+    super("Statistics classes mismatched: " + className1 + " vs. " + className2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java b/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java
new file mode 100644
index 0000000..f491233
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java
@@ -0,0 +1,54 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+
+/**
+ *
+ * Used to add extra behavior to a ValuesWriter that requires fallback
+ * @See {@link FallbackValuesWriter}
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface RequiresFallback {
+
+  /**
+   * In the case of a dictionary based encoding we will fallback if the dictionary becomes too big
+   * @return true to notify the parent that we should fallback to another encoding
+   */
+  boolean shouldFallBack();
+
+  /**
+   * Before writing the first page we will verify if the encoding is worth it.
+   * and fall back if a simpler encoding would be better in that case
+   * @param rawSize the size if encoded with plain
+   * @param encodedSize the size as encoded by the current encoding
+   * @return true if we keep this encoding
+   */
+  boolean isCompressionSatisfying(long rawSize, long encodedSize);
+
+  /**
+   * When falling back to a different encoding we must re-encode all the values seen so far
+   * @param writer the new encoder to write the current values to
+   */
+  void fallBackAllValuesTo(ValuesWriter writer);
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
new file mode 100644
index 0000000..a3d8920
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -0,0 +1,126 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+import java.io.IOException;
+
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Base class to implement an encoding for a given column type.
+ *
+ * A ValuesReader is provided with a page (byte-array) and is responsible
+ * for deserializing the primitive values stored in that page.
+ *
+ * Given that pages are homogeneous (store only a single type), typical subclasses
+ * will only override one of the read*() methods.
+ *
+ * @author Julien Le Dem
+ */
+public abstract class ValuesReader {
+
+  /**
+   * Called to initialize the column reader from a part of a page.
+   *
+   * The underlying implementation knows how much data to read, so a length
+   * is not provided.
+   *
+   * Each page may contain several sections:
+   * <ul>
+   *  <li> repetition levels column
+   *  <li> definition levels column
+   *  <li> data column
+   * </ul>
+   *
+   * This function is called with 'offset' pointing to the beginning of one of these sections,
+   * and should return the offset to the section following it.
+   *
+   * @param valueCount count of values in this page
+   * @param page the array to read from containing the page data (repetition levels, definition levels, data)
+   * @param offset where to start reading from in the page
+   *
+   * @throws IOException
+   */
+  public abstract void initFromPage(int valueCount, byte[] page, int offset) throws IOException;
+  
+  /**
+   * Called to return offset of the next section
+   * @return offset of the next section
+   */
+  public int getNextOffset() {
+    throw new ParquetDecodingException("Unsupported: cannot get offset of the next section.");
+  }
+
+  /**
+   * usable when the encoding is dictionary based
+   * @return the id of the next value from the page
+   */
+  public int readValueDictionaryId() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return the next boolean from the page
+   */
+  public boolean readBoolean() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return the next Binary from the page
+   */
+  public Binary readBytes() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return the next float from the page
+   */
+  public float readFloat() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return the next double from the page
+   */
+  public double readDouble() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return the next integer from the page
+   */
+  public int readInteger() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return the next long from the page
+   */
+  public long readLong() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Skips the next value in the page
+   */
+  abstract public void skip();
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
new file mode 100644
index 0000000..c8f31b9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
@@ -0,0 +1,128 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * base class to implement an encoding for a given column
+ *
+ * @author Julien Le Dem
+ *
+ */
+public abstract class ValuesWriter {
+
+  /**
+   * used to decide if we want to work to the next page
+   * @return the size of the currently buffered data (in bytes)
+   */
+  public abstract long getBufferedSize();
+
+
+  // TODO: maybe consolidate into a getPage
+  /**
+   * @return the bytes buffered so far to write to the current page
+   */
+  public abstract BytesInput getBytes();
+
+  /**
+   * called after getBytes() and before reset()
+   * @return the encoding that was used to encode the bytes
+   */
+  public abstract Encoding getEncoding();
+
+  /**
+   * called after getBytes() to reset the current buffer and start writing the next page
+   */
+  public abstract void reset();
+
+  /**
+   * @return the dictionary page or null if not dictionary based
+   */
+  public DictionaryPage createDictionaryPage() {
+    return null;
+  }
+
+  /**
+   * reset the dictionary when a new block starts
+   */
+  public void resetDictionary() {
+  }
+
+  /**
+   * ( > {@link #getBufferedMemorySize} )
+   * @return the allocated size of the buffer
+   */
+  abstract public long getAllocatedSize();
+
+  /**
+   * @param value the value to encode
+   */
+  public void writeByte(int value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value the value to encode
+   */
+  public void writeBoolean(boolean v) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value the value to encode
+   */
+  public void writeBytes(Binary v) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value the value to encode
+   */
+  public void writeInteger(int v) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value the value to encode
+   */
+  public void writeLong(long v) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value the value to encode
+   */
+  public void writeDouble(double v) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value the value to encode
+   */
+  public void writeFloat(float v) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  abstract public String memUsageString(String prefix);
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
new file mode 100644
index 0000000..f713263
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
@@ -0,0 +1,91 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * a column reader that packs the ints in the number of bits required based on the maximum size.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BitPackingValuesReader extends ValuesReader {
+  private static final Log LOG = Log.getLog(BitPackingValuesReader.class);
+
+  private ByteArrayInputStream in;
+  private BitPackingReader bitPackingReader;
+  private final int bitsPerValue;
+  private int nextOffset;
+
+  /**
+   * @param bound the maximum value stored by this column
+   */
+  public BitPackingValuesReader(int bound) {
+    this.bitsPerValue = getWidthFromMaxInt(bound);
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesReader#readInteger()
+   */
+  @Override
+  public int readInteger() {
+    try {
+      return bitPackingReader.read();
+    } catch (IOException e) {
+      throw new ParquetDecodingException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(long, byte[], int)
+   */
+  @Override
+  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+    int effectiveBitLength = valueCount * bitsPerValue;
+    int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
+    if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." );
+    this.in = new ByteArrayInputStream(in, offset, length);
+    this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
+    this.nextOffset = offset + length;
+  }
+  
+  @Override
+  public int getNextOffset() {
+    return nextOffset;
+  }
+
+  @Override
+  public void skip() {
+    readInteger();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
new file mode 100644
index 0000000..24436ef
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
@@ -0,0 +1,125 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.values.bitpacking.BitPacking.getBitPackingWriter;
+
+import java.io.IOException;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * a column writer that packs the ints in the number of bits required based on the maximum size.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BitPackingValuesWriter extends ValuesWriter {
+
+  private CapacityByteArrayOutputStream out;
+  private BitPackingWriter bitPackingWriter;
+  private int bitsPerValue;
+
+  /**
+   * @param bound the maximum value stored by this column
+   * @param pageSize
+   */
+  public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) {
+    this.bitsPerValue = getWidthFromMaxInt(bound);
+    this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
+    init();
+  }
+
+  private void init() {
+    this.bitPackingWriter = getBitPackingWriter(bitsPerValue, out);
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesWriter#writeInteger(int)
+   */
+  @Override
+  public void writeInteger(int v) {
+    try {
+      bitPackingWriter.write(v);
+    } catch (IOException e) {
+      throw new ParquetEncodingException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesWriter#getBufferedSize()
+   */
+  @Override
+  public long getBufferedSize() {
+    return out.size();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesWriter#getBytes()
+   */
+  @Override
+  public BytesInput getBytes() {
+    try {
+      this.bitPackingWriter.finish();
+      return BytesInput.from(out);
+    } catch (IOException e) {
+      throw new ParquetEncodingException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesWriter#reset()
+   */
+  @Override
+  public void reset() {
+    out.reset();
+    init();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesWriter#getAllocatedSize()
+   */
+  @Override
+  public long getAllocatedSize() {
+    return out.getCapacity();
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return out.memUsageString(prefix);
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return BIT_PACKED;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
new file mode 100644
index 0000000..c0ab7e0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
@@ -0,0 +1,83 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+
+public class ByteBitPackingValuesReader extends ValuesReader {
+  private static final int VALUES_AT_A_TIME = 8; // because we're using unpack8Values()
+
+  private static final Log LOG = Log.getLog(ByteBitPackingValuesReader.class);
+
+  private final int bitWidth;
+  private final BytePacker packer;
+  private final int[] decoded = new int[VALUES_AT_A_TIME];
+  private int decodedPosition = VALUES_AT_A_TIME - 1;
+  private byte[] encoded;
+  private int encodedPos;
+  private int nextOffset;
+
+  public ByteBitPackingValuesReader(int bound, Packer packer) {
+    this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
+    this.packer = packer.newBytePacker(bitWidth);
+  }
+
+  @Override
+  public int readInteger() {
+    ++ decodedPosition;
+    if (decodedPosition == decoded.length) {
+      if (encodedPos + bitWidth > encoded.length) {
+        packer.unpack8Values(Arrays.copyOfRange(encoded, encodedPos, encodedPos + bitWidth), 0, decoded, 0);
+      } else {
+        packer.unpack8Values(encoded, encodedPos, decoded, 0);
+      }
+      encodedPos += bitWidth;
+      decodedPosition = 0;
+    }
+    return decoded[decodedPosition];
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset)
+      throws IOException {
+    int effectiveBitLength = valueCount * bitWidth;
+    int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
+    if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitWidth + " bits." );
+    this.encoded = page;
+    this.encodedPos = offset;
+    this.decodedPosition = VALUES_AT_A_TIME - 1;
+    this.nextOffset = offset + length;
+  }
+  
+  @Override
+  public int getNextOffset() {
+    return nextOffset;
+  }
+
+  @Override
+  public void skip() {
+    readInteger();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
new file mode 100644
index 0000000..d0240bb
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
@@ -0,0 +1,86 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+
+import java.io.IOException;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+public class ByteBitPackingValuesWriter extends ValuesWriter {
+
+  private final Packer packer;
+  private final int bitWidth;
+  private ByteBasedBitPackingEncoder encoder;
+
+  public ByteBitPackingValuesWriter(int bound, Packer packer) {
+    this.packer = packer;
+    this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
+    this.encoder = new ByteBasedBitPackingEncoder(bitWidth, packer);
+  }
+
+  @Override
+  public void writeInteger(int v) {
+    try {
+      this.encoder.writeInt(v);
+    } catch (IOException e) {
+      throw new ParquetEncodingException(e);
+    }
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return BIT_PACKED;
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    try {
+      return encoder.toBytes();
+    } catch (IOException e) {
+      throw new ParquetEncodingException(e);
+    }
+  }
+
+  @Override
+  public void reset() {
+    encoder = new ByteBasedBitPackingEncoder(bitWidth, packer);
+  }
+
+  @Override
+  public long getBufferedSize() {
+    return encoder.getBufferSize();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return encoder.getAllocatedSize();
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return encoder.memUsageString(prefix);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
new file mode 100644
index 0000000..865eea2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
@@ -0,0 +1,123 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import java.io.IOException;
+
+import org.apache.parquet.io.ParquetDecodingException;
+
+class BitReader {
+  private int currentByte = 0;
+  private int currentPosition = 8;
+  private byte[] buf;
+  private int currentBufferPosition = 0;
+  private static final int[] byteGetValueMask = new int[8];
+  private static final int[] readMask = new int[32];
+  private int endBufferPosistion;
+
+  static {
+    int currentMask = 1;
+    for (int i = 0; i < byteGetValueMask.length; i++) {
+      byteGetValueMask[i] = currentMask;
+      currentMask <<= 1;
+    }
+    currentMask = 0;
+    for (int i = 0; i < readMask.length; i++) {
+      readMask[i] = currentMask;
+      currentMask <<= 1;
+      currentMask += 1;
+    }
+  }
+
+  /**
+   * Prepare to deserialize bit-packed integers from the given array.
+   * The array is not copied, so must not be mutated during the course of
+   * reading.
+   */
+  public void prepare(byte[] buf, int offset, int length) {
+    this.buf = buf;
+    this.endBufferPosistion = offset + length;
+    currentByte = 0;
+    currentPosition = 8;
+    currentBufferPosition = offset;
+  }
+
+  /**
+   * Extract the given bit index from the given value.
+   */
+  private static boolean extractBit(int val, int bit) {
+    return (val & byteGetValueMask[bit]) != 0;
+  }
+
+  /**
+   * Read an integer from the stream which is represented by a specified
+   * number of bits.
+   * @param bitsPerValue the number of bits used to represent the integer
+   */
+  public int readNBitInteger(int bitsPerValue) {
+    int bits = bitsPerValue + currentPosition;
+    int currentValue = currentByte >>> currentPosition;
+    int toShift = 8 - currentPosition;
+    while (bits >= 8) {
+      currentByte = getNextByte();
+      currentValue |= currentByte << toShift;
+      toShift += 8;
+      bits -= 8;
+    }
+    currentValue &= readMask[bitsPerValue];
+    currentPosition = (bitsPerValue + currentPosition) % 8;
+    return currentValue;
+  }
+
+  private int getNextByte() {
+    if (currentBufferPosition < endBufferPosistion) {
+      return buf[currentBufferPosition++] & 0xFF;
+    }
+    return 0;
+  }
+
+  public boolean readBit() throws IOException {
+    if (currentPosition == 8) {
+      currentByte = getNextByte();
+      currentPosition = 0;
+    }
+    return extractBit(currentByte, currentPosition++);
+  }
+
+  public int readByte() {
+    currentByte |= (getNextByte() << 8);
+    int value = (currentByte >>> currentPosition) & 0xFF;
+    currentByte >>>= 8;
+    return value;
+  }
+
+  public int readUnsignedVarint() throws IOException {
+    int value = 0;
+    int i = 0;
+    int b;
+    while (((b = readByte()) & 0x80) != 0) {
+        value |= (b & 0x7F) << i;
+        i += 7;
+        if (i > 35) {
+            throw new ParquetDecodingException("Variable length quantity is too long");
+        }
+    }
+    return value | (b << i);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
new file mode 100644
index 0000000..1d1d9d1
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
@@ -0,0 +1,159 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+
+class BitWriter {
+  private static final Log LOG = Log.getLog(BitWriter.class);
+  private static final boolean DEBUG = false;//Log.DEBUG;
+
+  private CapacityByteArrayOutputStream baos;
+  private int currentByte = 0;
+  private int currentBytePosition = 0;
+  private static final int[] byteToTrueMask = new int[8];
+  private static final int[] byteToFalseMask = new int[8];
+  private boolean finished = false;
+  static {
+    int currentMask = 1;
+    for (int i = 0; i < byteToTrueMask.length; i++) {
+      byteToTrueMask[i] = currentMask;
+      byteToFalseMask[i] = ~currentMask;
+      currentMask <<= 1;
+    }
+  }
+
+  public BitWriter(int initialCapacity, int pageSize) {
+    this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
+  }
+
+  public void writeBit(boolean bit) {
+    if (DEBUG) LOG.debug("writing: " + (bit ? "1" : "0"));
+    currentByte = setBytePosition(currentByte, currentBytePosition++, bit);
+    if (currentBytePosition == 8) {
+      baos.write(currentByte);
+      if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte));
+      currentByte = 0;
+      currentBytePosition = 0;
+    }
+  }
+
+  public void writeByte(int val) {
+    if (DEBUG) LOG.debug("writing: " + toBinary(val) + " (" + val + ")");
+    currentByte |= ((val & 0xFF) << currentBytePosition);
+    baos.write(currentByte);
+    if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte));
+    currentByte >>>= 8;
+  }
+
+  /**
+   * Write the given integer, serialized using the given number of bits.
+   * It is assumed that the integer can be correctly serialized within
+   * the provided bit size.
+   * @param val the value to serialize
+   * @param bitsToWrite the number of bits to use
+   */
+  public void writeNBitInteger(int val, int bitsToWrite) {
+    if (DEBUG) LOG.debug("writing: " + toBinary(val, bitsToWrite) + " (" + val + ")");
+    val <<= currentBytePosition;
+    int upperByte = currentBytePosition + bitsToWrite;
+    currentByte |= val;
+    while (upperByte >= 8) {
+      baos.write(currentByte); //this only writes the lowest byte
+      if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte));
+      upperByte -= 8;
+      currentByte >>>= 8;
+    }
+    currentBytePosition = (currentBytePosition + bitsToWrite) % 8;
+  }
+
+  private String toBinary(int val, int alignTo) {
+    String result = Integer.toBinaryString(val);
+    while (result.length() < alignTo) {
+      result = "0" + result;
+    }
+    return result;
+  }
+
+  private String toBinary(int val) {
+    return toBinary(val, 8);
+  }
+
+  public BytesInput finish() {
+    if (!finished) {
+      if (currentBytePosition > 0) {
+        baos.write(currentByte);
+        if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte));
+      }
+    }
+    finished = true;
+    return BytesInput.from(baos);
+  }
+
+  public void reset() {
+    baos.reset();
+    currentByte = 0;
+    currentBytePosition = 0;
+    finished = false;
+  }
+
+  /**
+   * Set or clear the given bit position in the given byte.
+   * @param currentByte the byte to mutate
+   * @param bitOffset the bit to set or clear
+   * @param newBitValue whether to set or clear the bit
+   * @return the mutated byte
+   */
+  private static int setBytePosition(int currentByte, int bitOffset, boolean newBitValue) {
+    if (newBitValue) {
+      currentByte |= byteToTrueMask[bitOffset];
+    } else {
+      currentByte &= byteToFalseMask[bitOffset];
+    }
+    return currentByte;
+  }
+
+  //This assumes you will never give it a negative value
+  public void writeUnsignedVarint(int value) {
+    while ((value & 0xFFFFFF80) != 0L) {
+      writeByte((value & 0x7F) | 0x80);
+      value >>>= 7;
+    }
+    writeByte(value & 0x7F);
+  }
+
+  public int getMemSize() {
+    // baos = 8 bytes
+    // currentByte + currentBytePosition = 8 bytes
+    // the size of baos:
+    //   count : 4 bytes (rounded to 8)
+    //   buf : 12 bytes (8 ptr + 4 length) should technically be rounded to 8 depending on buffer size
+    return 32 + (int)baos.size();
+  }
+
+  public int getCapacity() {
+    return baos.getCapacity();
+  }
+
+  public String memUsageString(String prefix) {
+    return baos.memUsageString(prefix);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
new file mode 100644
index 0000000..3ace6e2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
@@ -0,0 +1,32 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+
+public abstract class BoundedIntValuesFactory {
+  public static ValuesReader getBoundedReader(int bound) {
+    return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound);
+  }
+
+  public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) {
+    return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
new file mode 100644
index 0000000..dda4431
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
@@ -0,0 +1,93 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import static org.apache.parquet.Log.DEBUG;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * @see BoundedIntValuesWriter
+ */
+class BoundedIntValuesReader extends ValuesReader {
+  private static final Log LOG = Log.getLog(BoundedIntValuesReader.class);
+
+  private int currentValueCt = 0;
+  private int currentValue = 0;
+  private final int bitsPerValue;
+  private BitReader bitReader = new BitReader();
+  private int nextOffset;
+
+  public BoundedIntValuesReader(int bound) {
+    if (bound == 0) {
+      throw new ParquetDecodingException("Value bound cannot be 0. Use DevNullColumnReader instead.");
+    }
+    bitsPerValue = BytesUtils.getWidthFromMaxInt(bound);
+  }
+
+  @Override
+  public int readInteger() {
+    try {
+      if (currentValueCt > 0) {
+        currentValueCt--;
+        return currentValue;
+      }
+      if (bitReader.readBit()) {
+        currentValue = bitReader.readNBitInteger(bitsPerValue);
+        currentValueCt = bitReader.readUnsignedVarint() - 1;
+      } else {
+        currentValue = bitReader.readNBitInteger(bitsPerValue);
+      }
+      return currentValue;
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read int", e);
+    }
+  }
+
+  // This forces it to deserialize into memory. If it wanted
+  // to, it could just read the bytes (though that number of
+  // bytes would have to be serialized). This is the flip-side
+  // to BoundedIntColumnWriter.writeData(BytesOutput)
+  @Override
+  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+    if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in[offset] + " " + in[offset + 1] + " " + in[offset + 2] + " " + in[offset + 3] + " ");
+    int totalBytes = BytesUtils.readIntLittleEndian(in, offset);
+    if (DEBUG) LOG.debug("will read "+ totalBytes + " bytes");
+    currentValueCt = 0;
+    currentValue = 0;
+    bitReader.prepare(in, offset + 4, totalBytes);
+    if (DEBUG) LOG.debug("will read next from " + (offset + totalBytes + 4));
+    this.nextOffset = offset + totalBytes + 4;
+  }
+  
+  @Override
+  public int getNextOffset() {
+    return this.nextOffset;
+  }
+
+  @Override
+  public void skip() {
+    readInteger();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
new file mode 100644
index 0000000..0acaaf7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
@@ -0,0 +1,159 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import static org.apache.parquet.bytes.BytesInput.concat;
+import static org.apache.parquet.column.Encoding.RLE;
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.BitPackingValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * This is a special ColumnWriter for the case when you need to write
+ * integers in a known range. This is intended primarily for use with
+ * repetition and definition levels, since the maximum value that will
+ * be written is known a priori based on the schema. Assumption is that
+ * the values written are between 0 and the bound, inclusive.
+ *
+ * This differs from {@link BitPackingValuesWriter} in that this also performs
+ * run-length encoding of the data, so is useful when long runs of repeated
+ * values are expected.
+ */
+class BoundedIntValuesWriter extends ValuesWriter {
+  private static final Log LOG = Log.getLog(BoundedIntValuesWriter.class);
+
+  private int currentValue = -1;
+  private int currentValueCt = -1;
+  private boolean currentValueIsRepeated = false;
+  private boolean thereIsABufferedValue = false;
+  private int shouldRepeatThreshold = 0;
+  private int bitsPerValue;
+  private BitWriter bitWriter;
+  private boolean isFirst = true;
+
+  private static final int[] byteToTrueMask = new int[8];
+  static {
+    int currentMask = 1;
+    for (int i = 0; i < byteToTrueMask.length; i++) {
+      byteToTrueMask[i] = currentMask;
+      currentMask <<= 1;
+    }
+  }
+
+  public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) {
+    if (bound == 0) {
+      throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead.");
+    }
+    this.bitWriter = new BitWriter(initialCapacity, pageSize);
+    bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2));
+    shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue);
+    if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold);
+  }
+
+  @Override
+  public long getBufferedSize() {
+    // currentValue + currentValueCt = 8 bytes
+    // shouldRepeatThreshold + bitsPerValue = 8 bytes
+    // bitWriter = 8 bytes
+    // currentValueIsRepeated + isFirst = 2 bytes (rounded to 8 b/c of word boundaries)
+    return 32 + (bitWriter == null ? 0 : bitWriter.getMemSize());
+  }
+
+  // This assumes that the full state must be serialized, since there is no close method
+  @Override
+  public BytesInput getBytes() {
+    serializeCurrentValue();
+    BytesInput buf = bitWriter.finish();
+    if (Log.DEBUG) LOG.debug("writing a buffer of size " + buf.size() + " + 4 bytes");
+    // We serialize the length so that on deserialization we can
+    // deserialize as we go, instead of having to load everything
+    // into memory
+    return concat(BytesInput.fromInt((int)buf.size()), buf);
+  }
+
+  @Override
+  public void reset() {
+    currentValue = -1;
+    currentValueCt = -1;
+    currentValueIsRepeated = false;
+    thereIsABufferedValue = false;
+    isFirst = true;
+    bitWriter.reset();
+  }
+
+  @Override
+  public void writeInteger(int val) {
+    if (currentValue == val) {
+      currentValueCt++;
+      if (!currentValueIsRepeated && currentValueCt >= shouldRepeatThreshold) {
+        currentValueIsRepeated = true;
+      }
+    } else {
+      if (!isFirst) {
+        serializeCurrentValue();
+      } else {
+        isFirst = false;
+      }
+
+      newCurrentValue(val);
+    }
+  }
+
+  private void serializeCurrentValue() {
+    if (thereIsABufferedValue) {
+      if (currentValueIsRepeated) {
+        bitWriter.writeBit(true);
+        bitWriter.writeNBitInteger(currentValue, bitsPerValue);
+        bitWriter.writeUnsignedVarint(currentValueCt);
+      } else {
+        for (int i = 0; i < currentValueCt; i++) {
+          bitWriter.writeBit(false);
+          bitWriter.writeNBitInteger(currentValue, bitsPerValue);
+        }
+      }
+    }
+    thereIsABufferedValue = false;
+  }
+
+  private void newCurrentValue(int val) {
+    currentValue = val;
+    currentValueCt = 1;
+    currentValueIsRepeated = false;
+    thereIsABufferedValue = true;
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return bitWriter.getCapacity();
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return RLE;
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return bitWriter.memUsageString(prefix);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java
new file mode 100644
index 0000000..af92941
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java
@@ -0,0 +1,89 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * This is a special writer that doesn't write anything. The idea being that
+ * some columns will always be the same value, and this will capture that. An
+ * example is the set of repetition levels for a schema with no repeated fields.
+ */
+public class DevNullValuesWriter extends ValuesWriter {
+  @Override
+  public long getBufferedSize() {
+    return 0;
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  @Override
+  public void writeInteger(int v) {
+  }
+
+  @Override
+  public void writeByte(int value) {
+  }
+
+  @Override
+  public void writeBoolean(boolean v) {
+  }
+
+  @Override
+  public void writeBytes(Binary v) {
+  }
+
+  @Override
+  public void writeLong(long v) {
+  }
+
+  @Override
+  public void writeDouble(double v) {
+  }
+
+  @Override
+  public void writeFloat(float v) {
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    return BytesInput.empty();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return 0;
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return BIT_PACKED;
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return prefix + "0";
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
new file mode 100644
index 0000000..9201596
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
@@ -0,0 +1,52 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import java.io.IOException;
+
+import org.apache.parquet.column.values.ValuesReader;
+
+/**
+ * ColumnReader which does not read any actual data, but rather simply produces
+ * an endless stream of constant values.
+ * Mainly used to read definition levels when the only possible value is 0
+ */
+public class ZeroIntegerValuesReader extends ValuesReader {
+  
+  private int nextOffset;
+
+  public int readInteger() {
+    return 0;
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+    this.nextOffset = offset;
+  }
+  
+  @Override
+  public int getNextOffset() {
+    return nextOffset;
+  }
+
+  @Override
+  public void skip() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java
new file mode 100644
index 0000000..565d6ec
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java
@@ -0,0 +1,57 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Config for delta binary packing
+ *
+ * @author Tianshuo Deng
+ */
+class DeltaBinaryPackingConfig {
+  final int blockSizeInValues;
+  final int miniBlockNumInABlock;
+  final int miniBlockSizeInValues;
+
+  public DeltaBinaryPackingConfig(int blockSizeInValues, int miniBlockNumInABlock) {
+    this.blockSizeInValues = blockSizeInValues;
+    this.miniBlockNumInABlock = miniBlockNumInABlock;
+    double miniSize = (double) blockSizeInValues / miniBlockNumInABlock;
+    Preconditions.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
+    this.miniBlockSizeInValues = (int) miniSize;
+  }
+
+  public static DeltaBinaryPackingConfig readConfig(InputStream in) throws IOException {
+    return new DeltaBinaryPackingConfig(BytesUtils.readUnsignedVarInt(in),
+            BytesUtils.readUnsignedVarInt(in));
+  }
+
+  public BytesInput toBytesInput() {
+    return BytesInput.concat(
+            BytesInput.fromUnsignedVarInt(blockSizeInValues),
+            BytesInput.fromUnsignedVarInt(miniBlockNumInABlock));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
new file mode 100644
index 0000000..c1678ae
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -0,0 +1,170 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetDecodingException;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * Read values written by {@link DeltaBinaryPackingValuesWriter}
+ *
+ * @author Tianshuo Deng
+ */
+public class DeltaBinaryPackingValuesReader extends ValuesReader {
+  private int totalValueCount;
+  /**
+   * values read by the caller
+   */
+  private int valuesRead;
+  private int minDeltaInCurrentBlock;
+  private byte[] page;
+  /**
+   * stores the decoded values including the first value which is written to the header
+   */
+  private int[] valuesBuffer;
+  /**
+   * values loaded to the buffer, it could be bigger than the totalValueCount
+   * when data is not aligned to mini block, which means padding 0s are in the buffer
+   */
+  private int valuesBuffered;
+  private ByteArrayInputStream in;
+  private int nextOffset;
+  private DeltaBinaryPackingConfig config;
+  private int[] bitWidths;
+
+  /**
+   * eagerly load all the data into memory
+   *
+   * @param valueCount count of values in this page
+   * @param page       the array to read from containing the page data (repetition levels, definition levels, data)
+   * @param offset     where to start reading from in the page
+   * @throws IOException
+   */
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
+    in = new ByteArrayInputStream(page, offset, page.length - offset);
+    this.config = DeltaBinaryPackingConfig.readConfig(in);
+    this.page = page;
+    this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
+    allocateValuesBuffer();
+    bitWidths = new int[config.miniBlockNumInABlock];
+
+    //read first value from header
+    valuesBuffer[valuesBuffered++] = BytesUtils.readZigZagVarInt(in);
+
+    while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
+      loadNewBlockToBuffer();
+    }
+    this.nextOffset = page.length - in.available();
+  }
+  
+  @Override
+  public int getNextOffset() {
+    return nextOffset;
+  }
+  
+  /**
+   * the value buffer is allocated so that the size of it is multiple of mini block
+   * because when writing, data is flushed on a mini block basis
+   */
+  private void allocateValuesBuffer() {
+    int totalMiniBlockCount = (int) Math.ceil((double) totalValueCount / config.miniBlockSizeInValues);
+    //+ 1 because first value written to header is also stored in values buffer
+    valuesBuffer = new int[totalMiniBlockCount * config.miniBlockSizeInValues + 1];
+  }
+
+  @Override
+  public void skip() {
+    checkRead();
+    valuesRead++;
+  }
+
+  @Override
+  public int readInteger() {
+    checkRead();
+    return valuesBuffer[valuesRead++];
+  }
+
+  private void checkRead() {
+    if (valuesRead >= totalValueCount) {
+      throw new ParquetDecodingException("no more value to read, total value count is " + totalValueCount);
+    }
+  }
+
+  private void loadNewBlockToBuffer() {
+    try {
+      minDeltaInCurrentBlock = BytesUtils.readZigZagVarInt(in);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("can not read min delta in current block", e);
+    }
+
+    readBitWidthsForMiniBlocks();
+
+    // mini block is atomic for reading, we read a mini block when there are more values left
+    int i;
+    for (i = 0; i < config.miniBlockNumInABlock && valuesBuffered < totalValueCount; i++) {
+      BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidths[i]);
+      unpackMiniBlock(packer);
+    }
+
+    //calculate values from deltas unpacked for current block
+    int valueUnpacked=i*config.miniBlockSizeInValues;
+    for (int j = valuesBuffered-valueUnpacked; j < valuesBuffered; j++) {
+      int index = j;
+      valuesBuffer[index] += minDeltaInCurrentBlock + valuesBuffer[index - 1];
+    }
+  }
+
+  /**
+   * mini block has a size of 8*n, unpack 8 value each time
+   *
+   * @param packer the packer created from bitwidth of current mini block
+   */
+  private void unpackMiniBlock(BytePacker packer) {
+    for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
+      unpack8Values(packer);
+    }
+  }
+
+  private void unpack8Values(BytePacker packer) {
+    //calculate the pos because the packer api uses array not stream
+    int pos = page.length - in.available();
+    packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
+    this.valuesBuffered += 8;
+    //sync the pos in stream
+    in.skip(packer.getBitWidth());
+  }
+
+  private void readBitWidthsForMiniBlocks() {
+    for (int i = 0; i < config.miniBlockNumInABlock; i++) {
+      try {
+        bitWidths[i] = BytesUtils.readIntLittleEndianOnOneByte(in);
+      } catch (IOException e) {
+        throw new ParquetDecodingException("Can not decode bitwidth in block header", e);
+      }
+    }
+  }
+}